Commits

Anonymous committed 1938fce Merge

sync w/ atlasoff

  • Participants
  • Parent commits 5d1f7f7, d1623a6
  • Branches waffle-repo, mob

Comments (0)

Files changed (28)

 ba4d3e1229ee84f7437ed87dd418c35eb8599a0e AthenaMP-00-03-23
 1bd59373ff9d389cfc676bb6b827ebd0b38274f0 AthenaMP-00-03-24
 9aa0f6a281d1f0893f9d401d29f37a0aab7ad4c2 AthenaMP-00-03-25
+21f4be9ae9cf47aea9f414d175f01be0b61ba6ab AthenaMP-00-03-26
+9d75fbb90290a8a971429763d4b9b5450f84c9ae AthenaMP-00-03-27
+1d7c2c6377073d1e9f2d1c7e993d5047b77844aa AthenaMP-00-03-28
+c8e8140256997e2385b6c7b22f130c7ea2b51ffe AthenaMP-00-03-29
+7b7c52ce4b62674771a3a510e7494f8c115e1a27 AthenaMP-00-03-30
+3cf6957af395f3f4959616d76d35a6630f1765dc AthenaMP-00-03-31
+d58b9561ffba3c5bf45018e3eb50714ac4b76c43 AthenaMP-00-03-32
+7e95fd9ed185a3860562d7f69f31e594b81e436b AthenaMP-00-03-33
+8a0e801b5bed3d8695da71821724d271bbd33d54 AthenaMP-00-03-34
+2012-03-22  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Fix hit merging problems when running with ATHENA_PROC_NUMBER environment
+
+2012-03-02  Sebastien Binet  <sebastien.binet@cern.ch>
+
+	* tagging AthenaMP-00-03-33
+	* preliminary support for d3pd reading with MP
+	* M python/PyComps.py
+
+2012-02-21  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * define sc in MpMergeUtils for the case where there are no files to merge
+
+2012-02-21  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Switch to hybrid merger for Hits files by default
+	* Put some protection into get_mp_root for systems with no LOGNAME
+
+2012-02-16  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Stop the job if some merger fails
+
+2012-02-07  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Change the way we handle mergePOOL.exe return values 
+
+2012-02-01  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * fixes for checkreq.sh
+
+2012-01-28  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Skip execution of the first event using filter algorithm. Switch to this mode by default
+	* Fix for merging TAG files
+	* Merge monitoring histograms using DQHistogramMerge.py
+
+2012-01-25  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * A bit of reorganization for allowing multiple groups after all.
+
+2012-01-20  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Workaround for differences between asetup and GaudiPolicy. Enforce global flags
+          when loading _athenamp.
+	* tag AthenaMP-00-03-28
+
+2012-01-19  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * More improved error propagation for corner cases. Now enabled MpProcessing.
+
+2012-01-18  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * More exit code handling: different paths through PyComps give completely different
+          results. (Note that the current multiprocessing based code hangs, crashes, or does
+          whatever.) Adiabatic expansion still disabled.
+
+2012-01-17  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Finalized some of the worker return handling (still following the old reporting
+          style, even as it is no longer necessary). Adiabatic expansion still disabled.
+
+2012-01-13  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Further expansion of adiabatic changes (still disabled). Reworked python
+          function processing and argument passing. Started worker -> mother communication.
+
+2012-01-11  Vakho Tsulaia  <tsulaia@mail.cern.ch>
+
+	* Merger for DRAW files
+
+2012-01-10  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Expansion of adiabatic changes (still disabled) with initializers and
+          better result returns
+
+2011-12-22  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * First codes for an attempt to adiabatically remove the unmaintainable
+	  python/C++ hybrid prototype code in order to be able to run AthenaMP
+	  in production. For now, this new code is disabled in use (modify
+	  PyComps to enable, or run share/tests/AMP_basictests.py).
+
+          Fix: use full include for more recent boost (message_queue has become
+          a template ... )
+
+2011-12-19  Vakho Tsulaia  <tsulaia@cern.ch>
+
+	* Add merger for RDOs
+
 2011-11-28  Peter van Gemmeren  <gemmeren@anl.gov>
 	* tag AthenaMP-00-03-26
 	* Fix event data merging to exclude new metadata DataHeader tree
 ## For Athena policies: it has to be the first use statement
 use AtlasPolicy 	AtlasPolicy-*
 
-## For Gaudi tools, services and objects
-use GaudiInterface 	GaudiInterface-* 	External
-
 #aliases
 alias mpMon             mpMon.py
 alias mp_genevt_test    mp_genevt_test.py
 ## make the library private so nobody can link against
 private
 
+## For Gaudi tools, services and objects
+use GaudiInterface      GaudiInterface-*        External
+
 ## Put here your package dependencies...
 use AtlasROOT		AtlasROOT-*		External
 use AtlasPython	 	AtlasPython-*		External 
 use AtlasPyROOT		AtlasPyROOT-*		External
+use AtlasBoost          AtlasBoost-*            External
 use AthenaKernel	AthenaKernel-*		Control
-use AthenaBaseComps	AthenaBaseComps-*	Control
 use AthenaPython	AthenaPython-*		Control
 
 use EventInfo		EventInfo-*		Event
  extract_mp_stat.py \
 "
 
-  
 library AthenaMP *.cxx components/*.cxx
 apply_pattern component_library
 
 macro AthenaMP_PyROOT_linkopts " -L$(ROOT_home)/lib -lPyROOT"
 macro_append AthenaMP_linkopts " $(AthenaMP_PyROOT_linkopts)"
+
+## apparently, GaudiPolicy and asetup do not agree on installation directories
+## for python extension modules, so change the GaudiPolicy macro to conform to
+## what asetup expects:
+macro python_bin_module_dir '$(tag)/python/lib-dynload' \
+               target-winxp '$(tag)\python\lib-dynload' \
+  use-shared-dir&target-winxp '$(tag)\lib/python2.6' \
+               use-shared-dir '$(tag)/lib/python2.6'
+
+## replacement of the multiprocessing features used by AthenaMP
+apply_pattern pyd_module module=_athenamp files=_athenamp/*.cxx deps=AthenaMP
+
+# TODO: figure out how to convince CMT to use the right set of linkopts (the
+# _use_linkopts naming is from the pyd_module pattern); an alternative is to
+# (build and) link with AthenaMPLib, but that runs into the same problem of
+# not having a set of linkopts for the module only
+#macro_append _athenamp_use_linkopts " -lrt"
+macro_append AthenaMP_linkopts " -lrt"
+macro_append AthenaMP_cppflags " -fno-strict-aliasing"
+
 end_private
 

python/AthenaMPFlags.py

     """
     statusOn = True
     allowedTypes = ['int']
-    StoredValue  = 1
+    StoredValue  = 0
 
 #not sure if this usable, have to convert into shared mp.Queue
 class EventQueue(JobProperty):

python/IoUtils.py

File contents unchanged.

python/MpMergeUtils.py

                     if os.path.isdir(os.path.join(wkdir, d))]
 
     msg.info("files for merging \n io_output_names= %s " % io_output_names)
-    
+
+    sc = 0
     #MAIN MERGING loop for various output files produced by workers
     for name in io_output_names:
         msg.info( "merging output: name=%s" % name)
                 elif isHITS:
                     # Merge HITS files
                     msg.info(" choice of merger: merge_hit_files")
+                    os.unsetenv('ATHENA_PROC_NUMBER')
                     merger = merge_hit_files
                 else:
                     pass
             else:
                 pass
 
+        # check if the input file is a DQ Histogram file
+        # DQ Histogram files are plain ROOT files, which have run_xxxxxxxx root directory in their ROOT folders
+        import ROOT
+        tFile = ROOT.TFile(files_exist[0])
+        if len(tFile.GetListOfKeys())==1:
+            keyRoot=tFile.GetListOfKeys()[0]
+            if keyRoot.IsFolder():
+                nameparts = keyRoot.GetName().split("_")
+                if len(nameparts)==2 and nameparts[0]=="run" and nameparts[1].isdigit():
+                    msg.info(" choice of merger: merge_dqhist_files")
+                    merger = merge_dqhist_files
+
         # If we have not been able to find a correct merger so far, then fail over hadd    
         if merger==None:
             msg.info(" choice of merger: merge_root_files")
 
         if sc != 0:
             msg.info( "merge_io_output: ERROR running merger job. " \
-                      "Check merging-%s.log file" % name) 
+                      "Check merging-%s.log file" % name)
+            return sc
         else:
             msg.info( 'merge_io_output: merging OK. ')
 
         import shutil
         shutil.copy2("PoolFileCatalog.xml", "MP_PoolFileCatalog.xml")
 
-    return #merge_io_output
+    return sc #merge_io_output
+
+def merge_dqhist_files(output_file, input_files, logfile="log.merge_root_files"):
+    # Uses DQHistogramMerge.py
+    msg.info("merge_dqhist_files: ")
+    import os, commands
+    if os.path.exists(output_file):
+        os.remove(output_file)
+
+    # Make an ascii file that contains list of input files for merging
+    monmerginginput = open('monmerginginput.txt','w')
+    for file in input_files:
+        monmerginginput.write("%s\n" % file)
+    monmerginginput.close()    
+
+    cmd = "DQHistogramMerge.py monmerginginput.txt %s" % output_file
+    msg.info( " firing DQ Histogram merging command:[%s]" % cmd)
+    sc, out = commands.getstatusoutput(cmd)
+    msg.info("merge_dqhist_files: sc=[%s], log_file=[%s] " % (sc, logfile))
+
+    write_merging_log(logfile, cmd, out, sc)
+    os.remove("monmerginginput.txt")
+    return sc % 256
 
 def merge_root_files(output_file, input_files, logfile="log.merge_root_files"):
     """merging list of input root files with identical tree structure into one output, produces log_file as well"""
 
 def merge_bs_files(output_file, input_files, logfile="log.merge_bs_files"):
     """dummy version of bs files merger"""
-    msg.info("merge_bs_files: [does nothing - to be implemented]")
-    #write_merging_log(logfile, cmd, out, sc)
-    sc = 0
-    return sc
+    msg.info("merge_bs_files: ")
+    import os, commands
+    if os.path.exists(output_file):
+        os.remove(output_file)
+
+    cmd = "AtlCopyBSEvent.exe -e all -o %s" % output_file
+    for file in input_files:
+        cmd = cmd + " %s" % file
+
+    msg.info( " firing merging command for merging bs files:[%s]" % cmd)
+    sc, out = commands.getstatusoutput(cmd)
+    msg.info("merge_bs_files: sc=[%s], log_file=[%s] " % (sc, logfile))
+        
+    write_merging_log(logfile, cmd, out, sc)
+    return sc % 256
 
 def merge_pool_collection_files(output_file, input_files, logfile="log.merge_pool_collection_files"):
     """ merge of pool collection files, like TAG.root"""
     if os.path.exists(output_file):
         os.remove(output_file)
 
-    merger_type="Merging_trf"
+    merger_type="merge_pool"
     
     # check if fast merging required
     #from AthenaMPFlags import jobproperties as jp    
                        output_file='metadata.pool.root')
     cfg.configure_job()
 
-    if cfg.is_rdo() or cfg.is_esd() or cfg.is_aod() or cfg.is_tag():
+    if cfg.is_esd() or cfg.is_aod() or cfg.is_tag():
         # main jobos
         include ('RecExCond/RecExCommon_flags.py')
         include ('RecExCommon/RecExCommon_topOptions.py')
         import AtlasGeoModel.SetGeometryVersion
         import AtlasGeoModel.GeoModelInit
         import AtlasGeoModel.SetupRecoGeometry
+    elif cfg.is_rdo():
+        from AthenaCommon.AppMgr import ServiceMgr
+        from AthenaPoolCnvSvc.AthenaPoolCnvSvcConf import AthenaPoolCnvSvc
+        ServiceMgr += AthenaPoolCnvSvc()
+        import AthenaPoolCnvSvc.ReadAthenaPool
+        from CLIDComps.CLIDCompsConf import ClassIDSvc
+        ServiceMgr += ClassIDSvc()
+        include('PartPropSvc/PartPropSvc.py')
+        include('RecExCond/AllDet_detDescr.py')
+
+        # TGCcablingServerSvc Hack
+        import MuonCnvExample.MuonCablingConfig
+
+        ServiceMgr.EventSelector.InputCollections = athenaCommonFlags.FilesInput()
+        ServiceMgr.EventSelector.SkipEvents = 100000
+
+        from AthenaPoolCnvSvc.WriteAthenaPool import AthenaPoolOutputStream
+        Out = 'metadata.pool.root'
+        StreamRDO = AthenaPoolOutputStream('StreamRDO',Out,True)
+        StreamRDO.TakeItemsFromInput=TRUE
+        StreamRDO.ForceRead=TRUE
+
+        # Copy InFile MetaData using MetaDataTools
+        from AthenaServices.AthenaServicesConf import AthenaOutputStream
+        StreamRDO_FH = AthenaOutputStream('StreamRDO_FH')
+        StreamRDO_FH.ItemList += ['IOVMetaDataContainer#']
+        StreamRDO.ExtendProvenanceRecord = False
+        ServiceMgr.AthenaPoolCnvSvc.MaxFileSizes = ['15000000000']
     else:
         pass
         
         return sc    
 
     # Step 2. Make events only file
-    import os, commands
+    import os, subprocess
     cmd = "mergePOOL.exe -o %s " % output_file
     for file in input_files:
         cmd = cmd + "-i %s " % file
     cmd = cmd + "-e MetaData -e MetaDataHdr -e MetaDataHdrDataHeaderForm -e MetaDataHdrDataHeader"
     msg.info( "firing hybrid pool merging command for event merging:[%s]" % cmd)
 
-    sc, out = commands.getstatusoutput(cmd)
-    msg.info("merge_pool_files_hybrid event merge: sc=[%s], log_file=[%s] " % (sc, logfile))
+    p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,close_fds=True)
     
     logfile.write("::::log-file:::::\n")
     logfile.write(" cmd: %s \n" % cmd )
     logfile.write("\n:::log-start::\n")
-    logfile.write("%s" % out)
+    while p.poll() is None:
+        line = p.stdout.readline()
+        if line:
+            logfile.write("%s" % line)
     logfile.write("\n:::log-end:::\n")
-    logfile.write("\n:::sc=%s:::\n" % sc)
+    logfile.write("\n:::sc=%s:::\n" % p.returncode)
 
-    if sc % 256 != 0:
+    msg.info("merge_pool_files_hybrid event merge: sc=[%s], log_file=[%s] " % (p.returncode, logfile))
+
+    if p.returncode!=0:
         if logfile not in (sys.stdout, sys.stderr):
             logfile.close()
-        return sc % 256
-
+        return p.returncode
 
     # Step 3. Produce final output
     cmd = "mergePOOL.exe -o %s -i metadata.pool.root" % output_file
     msg.info( "firing hybrid pool merging command for final merging:[%s]" % cmd)
 
-    sc, out = commands.getstatusoutput(cmd)
-    msg.info("merge_pool_files_hybrid final merge: sc=[%s], log_file=[%s] " % (sc, logfile))
+    p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,close_fds=True)
+    msg.info("merge_pool_files_hybrid final merge: sc=[%s], log_file=[%s] " % (p.returncode, logfile))
     logfile.write("::::log-file:::::\n")
     logfile.write(" cmd: %s \n" % cmd )
     logfile.write("\n:::log-start::\n")
-    logfile.write("%s" % out)
+    while p.poll() is None:
+        line = p.stdout.readline()
+        if line:
+            logfile.write("%s" % line)
     logfile.write("\n:::log-end:::\n")
-    logfile.write("\n:::sc=%s:::\n" % sc)
+    logfile.write("\n:::sc=%s:::\n" % p.returncode)
 
     if logfile not in (sys.stdout, sys.stderr):
         logfile.close()
     # remove the intermediate metadata.pool.root file
     os.remove("metadata.pool.root")
     
-    return sc % 256
+    return p.returncode
 
 def write_merging_log(logfile, cmd, out, sc):
     lf = open(logfile, 'w')

python/MpProcessing.py

+import multiprocessing, os, sys, types
+dlflags = sys.getdlopenflags()
+sys.setdlopenflags( 0x100 | 0x2 )    # RTLD_GLOBAL | RTLD_NOW
+import _athenamp as amp
+sys.setdlopenflags( dlflags )
+
+__all__ = [ 'cpu_count' ]
+
+
+# cpu_count is pure python (but does call sysconf for Linux)
+cpu_count = multiprocessing.cpu_count
+
+# the following sets are replacements, which are accessed on C++ through the
+# _athenamp extesion module for now
+
+# current_process is used for identification purposes in multiprocessing; it
+# serves no real purpose in AthenaMP (it's only used for printing a message)
+# since there is a single relation of a mother process with multiple children
+
+def current_process():
+    '''
+    Return process object representing the current process
+    '''
+    return amp.Process( os.getpid() )
+
+# active_children does not match exactly, but since AthenaMP only starts
+# readers/workers/writers from the mother, an aggregate of all groups will
+# do; note also that b/c of the AthenaMP model, no cleanup is needed (as is
+# in multiprocessing: Pools could close and re-open there)
+
+_process_groups = []
+def active_children():
+    """
+    Return list of process objects corresponding to live child processes
+    """
+    ac = list()
+
+    global _process_groups
+    for g in _process_groups:
+        ac += g._children()
+    return ac
+
+
+##### class Queue
+Queue = amp.SharedQueue
+
+
+##### class Pool
+class MapResults( object ):
+    def __init__( self, group ):
+        self._group = group
+
+    def get( self, *args, **kw ):
+        status = self._group.wait()
+
+     # there are two parts to the exit code: the reported result from the worker
+     # function and the process' exit code
+
+     # TODO: handle process' exit code in a cleaner way (fix in PyComps?), as this
+     # obviously does not work in general ...
+
+        result = []
+        for proc_result in status:
+        # The result from run_worker_queue is extremely circumspect: it can either
+        # contain a single tuple of 5 entries, 1 tuple of 4 entries, or 2 tuples of
+        # 4 entries. It is checked on being 'OK' on the last entry of each tuple.
+        # By sheer coincidence, that happens to work.
+
+        # Now, the tuple can not be edited, so it's turned into a list, which takes
+        # care of the spurious arrangement of lengths. Then for all possible returns,
+        # the third entry is always theApp._exitstate and may need modification.
+
+        # General exceptions are eaten, b/c this is so PyComps specific. :P
+             r = proc_result[ 2 ]  # the python-posted result
+             try:
+                 import types
+                 if type(r[0]) == types.TupleType:              # early return case
+                     z = [ list(y) for y in r ]
+                     for l2 in z:
+                         if l2[2] == 0: l2[2] = proc_result[1]
+                 else:
+                     z = [ [ list(y) for y in x ] for x in r ]  # late return case
+                     for l1 in z:
+                         for l2 in l1:
+                             if l2[2] == 0: l2[2] = proc_result[1]
+             except Exception:
+             # make-believe? can happen e.g. on early exit() or e.g. SIGQUIT
+                 r = [[(proc_result[0], -1, proc_result[1], 'ERR')]]
+             result.append( r )
+        return result
+
+class Pool( object ):
+    packaged_count = 0
+
+    def __init__( self, processes = None, initializer = None, initargs = (),
+                  maxtasksperchild = None ):
+ 
+        if not callable( initializer ):
+            raise TypeError( 'initializer must be a callable' )
+
+      # this workaround is needed b/c initargs can (and do) contain an amp.SharedQueue,
+      # which can not be marshalled, but COW will deal with it properly by binding it
+      # into this local 'packaged_initializer'
+        def packaged_initializer( initializer = initializer, initargs = initargs ):
+            return initializer( *initargs )
+
+        self.packaged_count += 1
+        self._initializer = '_amp_pool_%s_%d' % (initializer.__name__,self.packaged_count)
+
+        import __main__
+        setattr( __main__, self._initializer, packaged_initializer )
+
+        self._group = amp.ProcessGroup( processes )
+        global _process_groups
+        _process_groups.append( self._group )
+
+    def map_async( self, func, iterable, chunksize=1 ):
+     # NOTE: initializer results are ignored (same as in multiprocessing)
+        self._group.map_async( self._initializer )
+
+     # TODO: hand over iterable properly (it just so happens that in AthenaMP, it is
+     # a repeated list of MaxEvent, for use of reading from the queue)
+        self._group.map_async( '%s.%s' % (func.__module__,func.__name__), iterable[0] )
+        return MapResults( self._group )
+
+    def close( self ):
+        self._group.map_async( 'exit' )
+
+        global _process_groups
+        _process_groups.remove( self._group )
+
+    def join( self ):
+        pass  # alternative functionality for now
+
+# other features of multiprocessing are not used by AthenaMP (but might be
+# used by others; the following facade forwards for now
+
+class ModuleFacade( types.ModuleType ):
+    def __init__( self, module, name ):
+        types.ModuleType.__init__( self, name )
+        self.__dict__[ 'module' ] = module
+
+        import multiprocessing
+        self.__dict__[ 'pmp' ] = multiprocessing
+
+    def __getattr__( self, attr ):
+        try:
+            return getattr( self.module, attr )
+        except AttributeError:
+            pass
+
+        import AthenaCommon.Logging, logging
+        log = logging.getLogger( 'AthenaMP.MpProcessing' )
+        log.error( 'missing attribute %s (falling back on multiprocessing)', attr )
+        return getattr( self.pmp, attr )
+   
+
+sys.modules[ __name__ ] = ModuleFacade( sys.modules[ __name__ ], __name__ )
+del ModuleFacade

python/PyComps.py

 
 #-----Python imports---#
 import os, sys, time
-import multiprocessing as mp
+#import multiprocessing as mp
+import AthenaMP.MpProcessing as mp
 
 #-----Athena imports---#
 import AthenaCommon.SystemOfUnits as Units
         from AthenaCommon import CfgMgr
         from AthenaCommon.Configurable import Configurable
         cfgs = Configurable.allConfigurables
+        has_athena_outstream = not (CfgMgr.AthenaOutputStream is None)
+        has_athena_regstream = not (CfgMgr.RegistrationStream is None)
         for cfg in cfgs.itervalues():
-            if isinstance (cfg, CfgMgr.AthenaOutputStream):
+            if (has_athena_outstream and
+                isinstance (cfg, CfgMgr.AthenaOutputStream)):
                 if not ioreg.has_key(cfg.name()):
                     ioreg[cfg.name()] = dict()
                 try:
                     ioreg[cfg.name()][cfg.OutputFile] = ['<output>', None]
                 except AttributeError:
                     msg.warning("The stream %s has no OutputFile attribute. Skip adding to IoRegistry" % cfg.name())
+            if (has_athena_regstream and 
+                isinstance (cfg, CfgMgr.RegistrationStream)):
+                if not ioreg.has_key(cfg.name()):
+                    ioreg[cfg.name()] = dict()
+                try:
+                    ioreg[cfg.name()][cfg.OutputCollection] = ['<output>', None]
+                except AttributeError:
+                    msg.warning("The registration stream %s has no OutputCollection attribute. Skip adding to IoRegistry" % cfg.name())
 ##             elif isinstance (cfg, CfgMgr.THistSvc):
 ##                 if not ioreg.has_key(cfg.name()):
 ##                     ioreg[cfg.name()] = dict()
         self.declare_property(\
             kw,
             property_name='EventsBeforeFork',
-            dflt_value=1,
+            dflt_value=0,
             attr_name='_events_before_fork',
             doc="""number of events to be run in the 'mother' process before working the event workers. Default in 0 (fork after initialize)"""
             )
         
         self._athena_mp_root = jp.AthenaMPFlags.TmpDir() #default set above
         #self._event_queue = jp.AthenaMPFlags.EventQueue() #property doesn't exist
-        self._events_before_fork = jp.AthenaMPFlags.EventsBeforeFork() #Default is 1
+        self._events_before_fork = jp.AthenaMPFlags.EventsBeforeFork() #Default is 0
         self._cpu_list = jp.AthenaMPFlags.AffinityCPUList() #Default is []
         self._do_round_robin = jp.AthenaMPFlags.doRoundRobin() #default is False
         self._use_single_reader = jp.AthenaMPFlags.useSingleReader() #default is False
         self.msg.debug("_events_before_fork=%s" % self._events_before_fork)
         self.msg.debug("_cpu_list=%s" % self._cpu_list)
 
+        # Hack to force initialization and callbacks but avoid execute()
+        if self._events_before_fork==0:
+            installEventSkipFilter()
+
         from AthenaCommon import CfgMgr
         _cfg = getattr (CfgMgr, self._evtloop_mgr_type)
         self._wrapped_props = list()
 
             if hasattr(svcMgr,'ByteStreamInputSvc'):
                 totalnentries=trfutil.getCachedFileInfo(svcMgr.ByteStreamInputSvc.FullFileName,'nentries')
-            elif hasattr(svcMgr.EventSelector,'InputCollections'):
-                totalnentries=trfutil.getCachedFileInfo(svcMgr.EventSelector.InputCollections,'nentries')
+            elif hasattr(svcMgr, 'EventSelector'):
+                # root n-tuple
+                if (CfgMgr.Athena__RootNtupleEventSelector and
+                    isinstance(evtsel, CfgMgr.Athena__RootNtupleEventSelector)):
+                    for fname in list(evtsel.InputCollections):
+                        totalnentries.append(
+                            trfutil.ntup_entries(
+                                fname=fname,
+                                tree_names=evtsel.TupleName
+                                )
+                            )
+                    pass
+                # athena-pool
+                elif hasattr(svcMgr.EventSelector,'InputCollections'):
+                    totalnentries=trfutil.getCachedFileInfo(
+                        svcMgr.EventSelector.InputCollections,
+                        'nentries'
+                        )
 
             for nentry in totalnentries:
                 self._nevt += nentry
 
             Utils.watch()
             _info("MergeUtils.merge_io_output")
-            MergeUtils.merge_io_output(self.top_wkdir) # merging the output
-            Utils.watch(self.msg, "MERGING"); 
+            sc=MergeUtils.merge_io_output(self.top_wkdir) # merging the output
+            Utils.watch(self.msg, "MERGING");
+            if sc!=0:
+                return StatusCode.Failure
         
         return StatusCode.Success
     # IService - end
                 iAlgorithm(alg).Enable = True
 
             #now copy partial output to workers work directories
-            Utils.hack_copy(os.curdir, self.top_wkdir) 
+            Utils.hack_copy(os.curdir, self.top_wkdir)
+        else:
+            self.nextEvent(1)
+            from GaudiPython.Bindings import iAlgorithm
+            iAlgorithm('EventSkipFilter').Enable = False
+            
         Utils.watch(self.msg, "FIRSTEVENT")
         
         fds_dict.extract_fds()
         if not self._do_round_robin: 
             # a shared queue to distribute event numbers
             item_nbr = int(maxevt) + int(self._ncpus) + 5
-            _debug ("QUEUE will have [%i] items: %i evts to process and %i Nones to notify \
-                    workers about the end of queue" % (item_nbr, maxevt, 5 + self._ncpus ) )
+            _debug ("QUEUE will have [%i] items: %i evts to process and %i Nones to notify" \
+                    "workers about the end of queue" % (item_nbr, maxevt, 5 + self._ncpus ) )
             self._event_queue = mp.Queue(item_nbr)
-            _info ("a shared queue with [%i] evts  for [%i] workers in parallel \
-                    ..." % (maxevt, self._ncpus) )
+            _info ("a shared queue with [%i] evts  for [%i] workers in parallel" \
+                    "..." % (maxevt, self._ncpus) )
             evt_list = list()
             #for i in xrange(maxevt - self._events_before_fork): # event order list
             #    evt_nbr = i + self._events_before_fork 
    #import sys
    #from AthenaCommon.Logging import log as msg
    msg.debug("setupEvtSelForSeekOps:")
+   if sys.modules.has_key('AthenaRootComps.ReadAthenaRoot'):
+       # athenarootcomps has seeking enabled by default
+       msg.info('=> Seeking enabled.')
+       return
+   
    if not sys.modules.has_key('AthenaPoolCnvSvc.ReadAthenaPool'):
       ## user did not import that module so we give up
       msg.info( "Cannot enable 'seeking' b/c module " + \
     evtloop_mgr.msg.info ("run_reader [%s-%s]: nbr of events to process: %i",
                           ppid,pid,evts)
     sc = evtloop_mgr.readEvent (evts)
-    if not sc.isSuccess():
+    if sc != Success:
         evtloop_mgr.msg.error ("failed to read %i events", evts)
         out.append ((pid,evts,app._exitstate, 'ERR'))
         return out
 
     Utils.watch()
 
-    _elm.nbr = _elm._events_before_fork 
+    # Hack to force initialization and callbacks but avoid execute() in case _events_before_fork=0
+    if _elm._events_before_fork == 0:
+        _elm.nbr = 1
+    else:
+        _elm.nbr = _elm._events_before_fork
+
     _elm.processed_evts = list()
     def get_event():
         e = _elm._event_queue.get()
             return out
         ost1=os.times(); t1 = time.time();
         print_tuple = (pid, Utils.get_cpu(pid), evt, t1-t0) + tuple( map(operator.sub, ost1, ost0))
-        _debug("WORKER_EVENT_STAT: pid=%i cpu=%i evt=%i \
-                elap_time=%.6f user_time=%.6f system_time=%.6f \
-                cutime=%.6f cstime=%.6f elap_os_time=%.6f" % print_tuple )
+        _debug("WORKER_EVENT_STAT: pid=%i cpu=%i evt=%i " \
+                "elap_time=%.6f user_time=%.6f system_time=%.6f " \
+                "cutime=%.6f cstime=%.6f elap_os_time=%.6f" % print_tuple )
         _debug( "SO FAR THIS WORKER PROCESSED %i events: %s " % (_elm.nbr - _elm._events_before_fork, _elm.processed_evts) )
         evt = get_event()
 
     sc = 'OK' if app.finalize().isSuccess() else 'ERR'
     out.append ((pid,-1,app._exitstate, sc))
     return out
+
+def installEventSkipFilter():
+    def eventskipfilter(run,evt):
+        return False
+    from AthenaCommon.AlgSequence import AthSequencer
+    from GaudiSequencer.PyComps import PyEvtFilter
+    seq = AthSequencer('AthFilterSeq')
+    seq += PyEvtFilter('EventSkipFilter')
+    seq.EventSkipFilter.filter_fct=eventskipfilter
+    
             tmp_root = tempfile.tempdir
     else:
         msg.debug("Using ATHENA_MP_TMPDIR environment variable to set athena-mp dir")
-    return  os.sep.join([tmp_root,"athena-mp-tmp-%s" % os.environ['LOGNAME']])
+    username = "MP"    
+    if os.getenv("LOGNAME") != None :
+        username = os.getenv("LOGNAME")
+    elif os.getenv("USER") != None :
+        username = os.getenv("USER")
+    return  os.sep.join([tmp_root,"athena-mp-tmp-%s" % username])
 
 def hack_copy(srcDir, destDir):
     """ explicitly copy files not captured by IoRegistry"""

share/tests/AMP_basictests.py

+import sys, os, unittest
+sys.path.append( os.path.join( os.getcwd(), os.pardir ) )
+
+from common import *
+
+__all__ = [
+   'Basic011ModuleTestCase',
+   'Basic02ExecutionTestCase',
+   'Basic03GroupTestCase',
+   'Basic04SharedQueueTestCase',
+]
+
+if '--build' in sys.argv:
+   res = os.system( "cd ../../cmt; make QUICK=1" )
+   if res:
+      sys.exit( res )
+
+
+### basic module test cases ==================================================
+class Basic01ModuleTestCase( MyTestCase ):
+   def test01API( self ):
+      """Test module loading and API existence"""
+
+      import _athenamp
+
+      self.assert_( hasattr( _athenamp, 'launch' ) )
+      self.assert_( hasattr( _athenamp, 'ProcessGroup' ) )
+      self.assert_( hasattr( _athenamp, 'Process' ) )
+      self.assert_( hasattr( _athenamp, 'SharedQueue' ) )
+
+   def test02ArgumentsAndErrors( self ):
+      """Test basic faulty argument error handling"""
+
+      import _athenamp
+
+      self.assertRaises( TypeError, _athenamp.launch, 1 )
+
+   def test03Instantiations( self ):
+      """Test class instantiations"""
+
+      import _athenamp
+
+      proc = _athenamp.Process( -1 )
+      self.assertEqual( proc.pid, -1 )
+
+      proc = _athenamp.Process( pid = -1 )
+      self.assertEqual( proc.pid, -1 )
+
+      group = _athenamp.ProcessGroup()
+      group = _athenamp.ProcessGroup( 4 )
+      group = _athenamp.ProcessGroup( nprocs = 4 )
+
+      queue = _athenamp.SharedQueue()
+      queue = _athenamp.SharedQueue( 100 )
+
+
+### basic execution test cases ===============================================
+class Basic02ExecutionTestCase( MyTestCase ):
+   def _checkChildren( self ):
+    # the following tests that there are no children running
+      self.assertRaises( OSError, os.wait )
+
+   def setUp( self ):
+      self._checkChildren()
+
+   def tearDown( self ):
+      self._checkChildren()
+
+   def test01RunChild( self ): 
+      """Test running and destruction of a child"""
+
+      import _athenamp
+
+      proc = _athenamp.launch()
+      self.assert_( 0 <= proc.pid )
+
+      if proc.pid == 0:
+         import signal
+         signal.pause()
+      else:
+         import time, signal
+         time.sleep(1)
+         sigtosend = signal.SIGKILL
+         os.kill( proc.pid, sigtosend )
+         result = os.waitpid( proc.pid, 0 )
+         self.assertEqual( result[0], proc.pid )
+         self.assertEqual( result[1], sigtosend )
+
+   def test02RunChildren( self ):
+      """Test running and destruction of a group of children"""
+
+      import _athenamp
+
+      pids = []
+
+      leader = _athenamp.launch()
+      if leader.pid == 0:     # make child wait
+         import signal
+         signal.pause()
+      else:
+         os.setpgid( leader.pid, 0 )
+         pids.append( leader.pid )
+
+      for i in range( 2 ):
+         proc = _athenamp.launch();
+         self.assert_( 0 <= proc.pid )
+
+         if proc.pid == 0:    # make all children wait
+            import signal
+            signal.pause()
+         else:
+            assert leader.pid
+            os.setpgid( proc.pid, os.getpgid( leader.pid ) )
+            pids.append( proc.pid )
+
+      import time, signal
+      time.sleep( 1 )
+      sigtosend = signal.SIGKILL
+      pgid = os.getpgid( leader.pid )
+      os.killpg( pgid, sigtosend )
+      while pids:
+         result = os.waitpid( -pgid, 0) 
+         self.assert_( result[0] in pids )
+         self.assertEqual( result[1], sigtosend )
+         pids.remove( result[0] )
+
+
+### basic group usage test cases =============================================
+class Basic03GroupTestCase( MyTestCase ):
+   def test01GroupLifetime( self ):
+      """Test creation and life time of a group"""
+
+      import _athenamp
+
+      group = _athenamp.ProcessGroup( 4 )
+
+    # nothing started yet, so waiting should simply return
+      self.assertEqual( group.wait(), () )
+      self.assertEqual( group.wait( 0 ), () )
+      self.assertEqual( group.wait( options = 0 ), () )
+
+   def test02RunMapAsync( self ):
+      """Test no-op running of map_async on a worker group"""
+      
+      import _athenamp
+
+      group = _athenamp.ProcessGroup( 4 )
+      group.map_async( "exit" )
+      self.assertEqual( len(group._children()), 4 )   # now instantiated
+
+      status = group.wait()
+      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
+
+   def test03PythonTaskMapAsync( self ):
+      """Test running a python task via map_async on a worker group"""
+      
+      import _athenamp, __main__
+
+      def myfunc():
+      #  print 'called myfunc'
+          return 1
+      __main__.myfunc = myfunc
+
+    # existing function with return value
+      group = _athenamp.ProcessGroup( 4 )
+      group.map_async( "myfunc" )
+      status = group.wait()
+
+      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
+      self.assertEqual( [ x[2] for x in status ], 4*[1,] )
+
+    # non-existing function, leading to failure
+      group = _athenamp.ProcessGroup( 4 )
+      group.map_async( "no_such_func" )
+      status = group.wait()
+
+      self.assertEqual( [ x[1] for x in status ], 4*[0x0B,] )
+
+
+### basic group usage test cases =============================================
+class Basic04SharedQueueTestCase( MyTestCase ):
+   def test01SharedQueueSending( self ):
+      """Test put functionality of shared queue"""
+
+      import _athenamp, random
+
+      q = _athenamp.SharedQueue( 5 )
+
+      r = random.Random( 1 )
+      largebuf = ''.join( [ str(r.random()) for i in range(4096) ] )
+      self.assertRaises( OverflowError, q.put_nowait, largebuf ) # too large for buffer
+
+      for i in range(5):
+         q.put_nowait( i )
+      self.assertRaises( OverflowError, q.put_nowait, 5 )        # too many elements
+
+      for i in range(5):
+         self.assertEqual( q.get_nowait(), i )
+      self.assertRaises( EOFError, q.get_nowait )
+
+      sdata = [ "text", "text\0with\0null", "morenull\0\0" ]
+      for t in sdata:
+         q.put_nowait( t )
+         self.assertEqual( q.get_nowait(), t )
+
+
+## actual test run
+if __name__ == '__main__':
+   from MyTextTestRunner import MyTextTestRunner
+
+   loader = unittest.TestLoader()
+   testSuite = loader.loadTestsFromModule( sys.modules[ __name__ ] )
+
+   runner = MyTextTestRunner( verbosity = 2 )
+   result = not runner.run( testSuite ).wasSuccessful()
+
+   sys.exit( result )

share/tests/MyTextTestRunner.py

+import unittest
+
+if hasattr( unittest, 'TextTestResult' ):
+   class MyTextTestResult( unittest.TextTestResult ):
+      def getDescription(self, test):
+         return test.shortDescription()
+else:
+   class MyTextTestResult( object ):
+      pass
+
+
+class MyTextTestRunner( unittest.TextTestRunner ):
+   resultclass = MyTextTestResult
+
+   def run( self, test ):
+      """Run the given test case or test suite."""
+
+      result = self._makeResult()
+      test( result )
+      result.printErrors()
+      self.stream.writeln( result.separator2 )
+      run = result.testsRun
+      self.stream.writeln()
+
+      if not result.wasSuccessful():
+         self.stream.write( "FAILED (" )
+         failed, errored = map( len, ( result.failures, result.errors ) )
+         if failed:
+            self.stream.write( "failures=%d" % failed )
+         if errored:
+            if failed: self.stream.write( ", " )
+            self.stream.write( "errors=%d" % errored )
+         self.stream.writeln( ")" )
+      else:
+         self.stream.writeln( "OK" )
+   
+      return result

share/tests/common.py

+# File: roottest/python/common.py
+# Author: Wim Lavrijsen (LBNL, WLavrijsen@lbl.gov)
+# Created: 09/24/10
+# Last: 09/30/10
+
+__all__ = [ 'pylong', 'maxvalue', 'MyTestCase' ]
+
+import os, sys, unittest
+
+if sys.hexversion >= 0x3000000:
+   pylong = int
+   maxvalue = sys.maxsize
+
+   class MyTestCase( unittest.TestCase ):
+      def shortDescription( self ):
+         desc = str(self)
+         doc_first_line = None
+
+         if self._testMethodDoc:
+            doc_first_line = self._testMethodDoc.split("\n")[0].strip()
+         if doc_first_line:
+            desc = doc_first_line
+         return desc
+else:
+   pylong = long
+   maxvalue = sys.maxint
+
+   class MyTestCase( unittest.TestCase ):
+      pass

src/_athenamp/AthenaMP/IdentifiedSharedQueue.h

+#ifndef ATHENAMP_IDENTIFIEDSHAREDQUEUE_H
+#define ATHENAMP_IDENTIFIEDSHAREDQUEUE_H
+
+#include "AthenaMP/SharedQueue.h"
+
+#include <unistd.h>
+
+namespace AthenaMP {
+
+class IdentifiedSharedQueue : public SharedQueue {
+public:
+
+public:
+   IdentifiedSharedQueue();
+   IdentifiedSharedQueue( const std::string& name,
+                int max_msg = SHAREDQUEUE_MAX_MSG, bool do_unlink = true );
+
+public:
+   virtual bool try_send( const std::string& );     // non-blocking
+   virtual bool send( const std::string& );
+
+   virtual std::string try_receive();                // non-blocking
+   virtual std::string try_receive( pid_t& id );     // id.
+   virtual std::string receive();
+   virtual std::string receive( pid_t& id );
+};
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_IDENTIFIEDSHAREDQUEUE_H

src/_athenamp/AthenaMP/Process.h

+#ifndef ATHENAMP_PROCESS_H
+#define ATHENAMP_PROCESS_H
+
+#include "AthenaMP/SharedQueue.h"
+#include "AthenaMP/IdentifiedSharedQueue.h"
+
+#include <string>
+#include <unistd.h>
+
+
+namespace AthenaMP {
+
+struct ScheduledWork {
+   void* data;
+   int size;
+};
+
+typedef ScheduledWork* (*scheduled_func)( ScheduledWork* databuf );
+
+class Process {
+public:
+   static Process launch();
+
+public:
+   explicit Process( pid_t pid );
+   Process( const Process& other );
+   Process& operator=( const Process& other );
+   virtual ~Process();
+
+   pid_t getProcessID() const;
+
+   bool connectIn( const SharedQueue& queue );
+   bool connectOut( const IdentifiedSharedQueue& queue );
+   bool schedule(
+      const std::string& func, const std::string& args = "" );
+
+   int mainloop();
+
+private:
+   SharedQueue           m_inbox;
+   IdentifiedSharedQueue m_outbox;
+   pid_t m_pid;
+};
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_PROCESS_H

src/_athenamp/AthenaMP/ProcessGroup.h

+#ifndef ATHENAMP_PROCESSGROUP_H
+#define ATHENAMP_PROCESSGROUP_H
+
+#include "AthenaMP/Process.h"
+#include "AthenaMP/IdentifiedSharedQueue.h"
+#include <vector>
+
+
+namespace AthenaMP {
+
+struct ProcessResult {
+   pid_t pid;
+   int   exitcode;
+   ScheduledWork output;
+};
+
+class ProcessGroup {
+public:
+   explicit ProcessGroup( int nprocs = -1 );
+   virtual ~ProcessGroup();
+
+   pid_t getGroupID() const;
+
+   int map_async( const std::string& func, const std::string& args = "" );
+   int wait( std::vector< ProcessResult >& status, int options = 0 );
+
+// the following should be temporary (is needed for the python
+// interface, but it is better broken up into the actual needs)
+   const std::vector< Process >& getChildren() const;
+
+private:
+   bool create();
+
+private:
+   std::vector< Process > m_processes;
+   IdentifiedSharedQueue m_inbox;
+   int m_nprocs;
+   pid_t m_pgid;
+};
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_PROCESSGROUP_H

src/_athenamp/AthenaMP/SharedQueue.h

+#ifndef ATHENAMP_SHAREDQUEUE_H
+#define ATHENAMP_SHAREDQUEUE_H
+
+// full include needed, as message_queue has changed into a
+// template for more recent versions of boost
+#include <boost/interprocess/ipc/message_queue.hpp>
+
+#include <string>
+
+
+namespace AthenaMP {
+
+static const int SHAREDQUEUE_MAX_MSG = 100;
+
+class SharedQueue {
+public:
+
+public:
+   SharedQueue();
+   SharedQueue( const std::string& name,
+                int max_msg = SHAREDQUEUE_MAX_MSG, bool do_unlink = true );
+   SharedQueue( const SharedQueue& other );
+   SharedQueue& operator=( const SharedQueue& other );
+   ~SharedQueue();
+
+public:
+   std::string name() const;
+
+   virtual bool try_send( const std::string& );     // non-blocking
+   virtual bool send( const std::string& );
+
+   virtual std::string try_receive();               // non-blocking
+   virtual std::string receive();
+
+   operator bool() const {
+      return (bool)m_queue;
+   }
+
+protected:
+   boost::interprocess::message_queue* operator->() {
+      return m_queue;
+   }
+
+private:
+   void copy( const SharedQueue& other );
+   void destroy();
+
+private:
+   boost::interprocess::message_queue* m_queue;
+   std::string* m_name;
+   int* m_count;
+};
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_SHAREDQUEUE_H

src/_athenamp/IdentifiedSharedQueue.cxx

+#include "AthenaMP/IdentifiedSharedQueue.h"
+
+#include <boost/interprocess/ipc/message_queue.hpp>
+
+#include <sstream>
+
+using namespace boost::interprocess;
+
+
+const std::size_t MAX_MSG_SIZE = 4096;
+
+namespace AthenaMP {   
+
+//- construction/destruction -------------------------------------------------
+IdentifiedSharedQueue::IdentifiedSharedQueue() : SharedQueue()
+{
+/* empty */
+}
+
+IdentifiedSharedQueue::IdentifiedSharedQueue( const std::string& name, int max_msg, bool do_unlink ) :
+      SharedQueue( name, max_msg, do_unlink )
+{
+/* empty */
+}
+
+//- public member functions --------------------------------------------------
+inline std::string add_pid( const std::string& buf )
+{
+   std::ostringstream s;
+   s << (long)getpid() << '\0' << buf << std::ends;
+   return s.str();
+}
+
+bool IdentifiedSharedQueue::try_send( const std::string& buf )
+{
+   return this->SharedQueue::try_send( add_pid( buf ) );
+}
+
+bool IdentifiedSharedQueue::send( const std::string& buf )
+{
+   return this->SharedQueue::send( add_pid( buf ) );
+}
+
+inline std::string get_pid( const std::string& buf, pid_t &pid )
+{
+   std::istringstream s( buf );
+   long id = -1;
+   s >> id;
+   pid = (pid_t)id;
+   std::string::size_type pos = s.tellg();
+   if ( (pos+1) <= buf.size() )
+      return buf.substr( pos+1 );
+   return "";
+}
+  
+
+std::string IdentifiedSharedQueue::try_receive()
+{
+   pid_t pid;
+   return try_receive( pid );
+}
+
+std::string IdentifiedSharedQueue::try_receive( pid_t& id )
+{
+   const std::string& buf = this->SharedQueue::try_receive();
+   return get_pid( buf, id );
+}
+
+std::string IdentifiedSharedQueue::receive()
+{
+   pid_t pid;
+   return receive( pid );
+}
+
+std::string IdentifiedSharedQueue::receive( pid_t& id )
+{
+   const std::string& buf = this->SharedQueue::receive();
+   return get_pid( buf, id );
+}
+
+} // namespace AthenaMP

src/_athenamp/Process.cxx

+#include "AthenaMP/Process.h"
+
+#include <boost/interprocess/ipc/message_queue.hpp>
+
+#include <sys/prctl.h>
+#include <dlfcn.h>
+#include <signal.h>
+
+#include <iostream>
+
+using namespace boost::interprocess;
+
+
+namespace AthenaMP {   
+
+Process Process::launch()
+{
+   std::cout.flush();
+   std::cerr.flush();
+
+   pid_t pid = fork();
+   if ( pid == 0 )
+      prctl( PR_SET_PDEATHSIG, SIGHUP );          // Linux only
+
+   return Process( pid );
+}
+
+
+//- construction/destruction -------------------------------------------------
+Process::Process( pid_t pid ) : m_inbox(), m_outbox(), m_pid( pid )
+{
+
+}
+
+Process::Process( const Process& other ) :
+      m_inbox( other.m_inbox ), m_outbox( other.m_outbox ), m_pid( other.m_pid )
+{
+
+}
+
+Process& Process::operator=( const Process& other )
+{
+   if ( this != &other ) {
+      m_inbox = other.m_inbox;
+      m_outbox = other.m_outbox;
+      m_pid = other.m_pid;
+   }
+   return *this;
+}
+
+Process::~Process()
+{
+
+}
+
+
+//- public member functions --------------------------------------------------
+pid_t Process::getProcessID() const
+{
+   return m_pid;
+}
+
+bool Process::connectIn( const SharedQueue& queue )
+{
+   if ( ! queue.name().empty() ) {
+      m_inbox = queue;
+   /* TODO: check that the queue is valid and can receive messages */
+      return true;
+   }
+
+   return false;
+}
+
+bool Process::connectOut( const IdentifiedSharedQueue& queue )
+{
+   if ( ! queue.name().empty() ) {
+      m_outbox = queue;
+   /* TODO: check that the queue is valid and can send messages */
+      return true;
+   }
+
+   return false;
+}
+
+bool Process::schedule( const std::string& func, const std::string& args )
+{
+// Added to the inbox, typically from the mother; taken out of the inbox in
+// the mainloop in the worker.
+    bool send_ok = m_inbox.try_send( func );
+    if ( send_ok ) send_ok = m_inbox.try_send( args );
+
+   return send_ok;
+}
+
+int Process::mainloop() {
+// TODO: the exit codes used here continue where the "miscellaneous" exit codes
+// in AthenaCommon/ExitCodes.py end. There's nothing to enforce that, though,
+// and it's not documented either.
+   if ( m_pid != 0 )
+      return 1;
+
+   int exit_code = 0; 
+
+   while ( true ) {
+      std::string func = m_inbox.receive();
+      std::string args = m_inbox.receive();
+
+      if ( func == "exit" )
+         break;         // drop out of mainloop, will result in exit of process
+      
+      scheduled_func sf;
+      *(void**)(&sf) = dlsym( RTLD_DEFAULT, func.c_str() );
+      if ( ! sf ) {
+      // TODO: msgsvc this
+         std::cerr << "(_athenamp) ignoring function: " << func << std::endl;
+         exit_code = 0x09;
+         continue;
+      }
+
+      ScheduledWork inwork = { (void*)args.c_str(), args.size() };
+      ScheduledWork* outwork = sf( &inwork );
+
+      if ( outwork ) {
+         bool posted = m_outbox.try_send( std::string( (char*)outwork->data, outwork->size ) );
+         free( outwork->data );
+         free( outwork );
+
+         if ( ! posted ) {
+            exit_code = 0x0A;
+            break;
+         }
+      } else {
+      // assume reportable as an error
+         exit_code = 0x0B;
+      }
+   }
+
+   return exit_code;     // TODO: standardize/document exit code
+}
+
+} // namespace AthenaMP

src/_athenamp/ProcessGroup.cxx

+#include "AthenaMP/ProcessGroup.h"
+#include "AthenaMP/SharedQueue.h"
+
+#include <boost/interprocess/ipc/message_queue.hpp>
+
+#include <sys/wait.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sstream>
+
+using namespace boost::interprocess;
+
+
+//- helper -------------------------------------------------------------------
+static inline AthenaMP::SharedQueue create_queue( const std::string& owner, int count )
+{
+   std::ostringstream s;
+   s << "athenamp_" << owner << '_' << getpid() << '_' << count << std::ends;
+   AthenaMP::SharedQueue queue = AthenaMP::SharedQueue( s.str() );
+   return queue;
+}
+
+namespace AthenaMP {
+
+//- construction/destruction -------------------------------------------------
+ProcessGroup::ProcessGroup( int nprocs ) : m_nprocs( nprocs ), m_pgid( -1 )
+{
+   if ( m_nprocs < 0 ) {
+   // determine number of cores from system (available ones only)
+      m_nprocs = sysconf( _SC_NPROCESSORS_ONLN );
+   }
+}
+
+ProcessGroup::~ProcessGroup()
+{
+
+}
+
+bool ProcessGroup::create()
+{
+// TODO: this code leaves the queues from the previous children visible to all
+// their subsequent siblings. This can be helped by creating the queue first
+// in the children, but the current code at least requires no synchronization,
+// and has a minimum chance of leaving resources on failure.
+
+   if ( m_nprocs <= 0 )
+      return false;
+
+// a single queue for posting back onto the mother
+   if ( ! m_inbox ) {
+      std::ostringstream s;
+      s << "athenamp_mother_" << getpid() << std::ends;
+      m_inbox = AthenaMP::IdentifiedSharedQueue( s.str() );
+   }
+
+// create process group leader
+   SharedQueue queue = create_queue( "child", 0 );
+   Process leader = Process::launch();
+   leader.connectIn( queue );
+   leader.connectOut( m_inbox );
+   pid_t lpid = leader.getProcessID();
+   if ( lpid == 0 ) {
+      int status = leader.mainloop();
+      exit( status );
+   } else if ( lpid == -1 )
+      return false;
+
+   setpgid( lpid, 0 );
+   m_processes.push_back( leader );
+   m_pgid = getpgid( lpid );
+
+// create rest of the group
+   for ( int i = 1; i < m_nprocs; ++i ) {
+      SharedQueue queue = create_queue( "child", i );
+      Process p = Process::launch();
+      p.connectIn( queue );
+      p.connectOut( m_inbox );
+      if ( p.getProcessID() == 0 ) {
+         int status = p.mainloop();
+         exit( status );
+      }
+      setpgid( p.getProcessID(), m_pgid );
+      m_processes.push_back( p );
+   }
+
+   return true;
+}
+
+
+//- public member functions --------------------------------------------------
+pid_t ProcessGroup::getGroupID() const
+{
+   return m_pgid;
+}
+
+int ProcessGroup::map_async( const std::string& func, const std::string& args ) {
+// Map the named (C) function 'func' onto all current child processes. Does
+// not wait for the results, but will return success only if the writing to
+// all the child queues succeeds.
+
+   if ( m_processes.empty() ) {
+      if ( ! create() )
+         return -1;
+   }
+
+   for ( std::vector< Process >::iterator iproc = m_processes.begin();
+         iproc != m_processes.end(); ++iproc ) {
+
+      if ( ! iproc->schedule( func, args ) ) {
+      // stopping the scheduling on all other processes is debatable ...
+         return -1;
+      }
+   }
+
+   return 0;
+}
+
+int ProcessGroup::wait( std::vector< ProcessResult >& status, int options )
+{
+// Wait for all child processes and return their status codes in 'status'.
+   if ( ! status.empty() )
+      status.clear();
+
+   if ( m_processes.empty() )
+      return 0;
+
+// Schedule an exit if we are to wait forever
+   if ( ! options & WNOHANG )
+      map_async( "exit" );
+
+   int result = 0;
+   while ( status.size() != m_processes.size() ) {
+      int child_status = 0;
+      pid_t child = waitpid( -m_pgid, &child_status, options );
+      if ( child < 0 ) {
+         result = errno;
+         break;
+      }
+      child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
+      ProcessResult p = { child, child_status, { 0, 0 } };
+      status.push_back( p );
+   }
+
+// retrieve the last posted returns from our inbox
+   while ( true ) {
+      pid_t pid = -1;
+      std::string buf = m_inbox.try_receive( pid );
+      if ( pid < 0 )
+         break;
+
+   // this is silly, but nprocs is presumably a rather small number
+      for ( std::vector< ProcessResult >::iterator
+               istat = status.begin(); istat < status.end(); ++istat ) {
+         if ( istat->pid == pid ) {
+            free( istat->output.data );
+            istat->output.data = malloc( buf.size() );
+            memcpy( istat->output.data, buf.data(), buf.size() );
+            istat->output.size = buf.size();
+            break;
+         }
+      }
+
+   }
+
+// Processes are dead now
+   m_processes.clear();
+
+   return result;
+}
+
+const std::vector< Process >& ProcessGroup::getChildren() const
+{
+// Give access to the processes; this is temporarily needed for the python
+// interface, but it is better broken up into the actual needs).
+   return m_processes;
+}
+
+} // namespace AthenaMP

src/_athenamp/PyAthenaMP.h

+#ifndef ATHENAMP_PYATHENAMP_H
+#define ATHENAMP_PYATHENAMP_H
+
+// to prevent problems with fpos_t and redefinition warnings
+#if defined(linux)
+
+#include <stdio.h>
+
+#ifdef _POSIX_C_SOURCE
+#undef _POSIX_C_SOURCE
+#endif
+
+#ifdef _FILE_OFFSET_BITS
+#undef _FILE_OFFSET_BITS
+#endif
+
+#ifdef _XOPEN_SOURCE
+#undef _XOPEN_SOURCE
+#endif
+
+#endif
+
+#include "Python.h"
+
+#endif // !ATHENAMP_PYATHENAMP_H

src/_athenamp/PyModule.cxx

+#include "PyAthenaMP.h"
+#include "PyProcessGroup.h"
+#include "PyProcess.h"
+#include "PySharedQueue.h"
+
+
+//- data ---------------------------------------------------------------------
+PyObject* gAMPModule = 0;
+
+
+using namespace AthenaMP;
+
+//- module methods -------------------------------------------------------------
+static PyObject* amp_launch( PyObject*, PyObject* args )
+{
+   if ( ! PyArg_ParseTuple( args, const_cast< char* >( ":launch" ) ) )
+        return NULL;
+ 
+   const Process& proc = Process::launch();
+   if ( proc.getProcessID() < 0 ) {
+      PyErr_SetString( PyExc_RuntimeError, "could not launch child process" );
+      return NULL;
+   }
+
+   return (PyObject*)ProcessProxy_New( proc );
+}
+
+
+//- data -----------------------------------------------------------------------
+static PyMethodDef gAMPMethods[] = {
+   { (char*) "launch", (PyCFunction)amp_launch,
+     METH_VARARGS, (char*) "launch a new child process; returns process id" },
+   { NULL, NULL, 0, NULL }
+};
+
+
+//______________________________________________________________________________
+static inline bool addType( PyTypeObject* pytype, const char* name )
+{
+   if ( PyType_Ready( pytype ) < 0 ) {
+      PyErr_Print();
+      return false;
+   }
+
+   Py_INCREF( (PyObject*)pytype );           // PyModule_AddObject steals reference
+   if ( PyModule_AddObject( gAMPModule, (char*)name, (PyObject*)pytype ) < 0 ) {
+      Py_DECREF( (PyObject*)pytype );
+      PyErr_Print();
+      return false;
+   }
+
+   return true;
+}
+
+   
+PyMODINIT_FUNC init_athenamp()
+{
+   gAMPModule = Py_InitModule( (char*)"_athenamp", gAMPMethods );
+
+   if ( ! gAMPModule )
+      return;
+
+   if ( ! addType( &ProcessGroupProxy_Type, "ProcessGroup" ) )
+      return;
+
+   if ( ! addType( &ProcessProxy_Type, "Process" ) )
+      return;
+
+   if ( ! addType( &SharedQueueProxy_Type, "SharedQueue" ) )
+      return;
+
+}

src/_athenamp/PyProcess.cxx

+#include "PyAthenaMP.h"
+#include "PyProcess.h"
+
+#include <new>
+
+
+//- module classes -----------------------------------------------------------
+AthenaMP::ProcessProxy* AthenaMP::ProcessProxy_New( pid_t pid )
+{
+   ProcessProxy* pyproc = (ProcessProxy*)ProcessProxy_Type.tp_new( &ProcessProxy_Type, 0, 0 );
+   new( (void*)&pyproc->m_process ) Process( pid );
+   return pyproc;
+}
+
+AthenaMP::ProcessProxy* AthenaMP::ProcessProxy_New( const AthenaMP::Process& proc )
+{
+   ProcessProxy* pyproc = (ProcessProxy*)ProcessProxy_Type.tp_new( &ProcessProxy_Type, 0, 0 );
+   new( (void*)&pyproc->m_process ) Process( proc );
+   return pyproc;
+}
+
+
+using namespace AthenaMP;
+
+//__________________
+static ProcessProxy* pp_new( PyTypeObject*, PyObject*, PyObject* )
+{
+   ProcessProxy* pyproc = PyObject_GC_New( ProcessProxy, &ProcessProxy_Type );
+   PyObject_GC_Track( pyproc );
+   return pyproc;
+}
+
+//__________________
+static int pp_init( ProcessProxy* self, PyObject* args, PyObject* kwds )
+{
+   int pid = -1;
+   static const char* kwlist[] = { "pid", NULL };
+   if ( ! PyArg_ParseTupleAndKeywords( args, kwds, (char*)"i:Process", (char**)kwlist, &pid ) ) {
+      new( (void*)&self->m_process ) Process( (pid_t)-1 );
+      return -1;
+   }
+
+   new( (void*)&self->m_process ) Process( (pid_t)pid );
+   return 0;
+}
+
+//__________________
+static int pp_traverse( ProcessProxy*, visitproc, void* )
+{
+   return 0;
+}
+
+//__________________
+static int pp_clear( ProcessProxy* )
+{
+   return 0;
+}
+
+//__________________
+static void pp_dealloc( ProcessProxy* pyproc )
+{
+   PyObject_GC_UnTrack( pyproc );
+
+   pyproc->m_process.~Process();
+
+   PyObject_GC_Del( pyproc );
+}
+
+//__________________
+static PyObject* pp_repr( ProcessProxy* self )
+{
+   pid_t pid = self->m_process.getProcessID();
+   pid_t gid = -1;
+   if ( 0 < pid )
+      gid = getpgid( pid );
+   return PyString_FromFormat( "Process %ld from group %ld", (long)pid, (long)gid );
+}
+
+//__________________
+static PyObject* pp_getpid( ProcessProxy* pyproc, void* )
+{
+   return PyInt_FromLong( (long)pyproc->m_process.getProcessID() );
+}
+
+//__________________
+static PyGetSetDef pp_getset[] = {
+   { (char*)"pid", (getter)pp_getpid, NULL, NULL, NULL },
+   { (char*)NULL, NULL, NULL, NULL, NULL }
+};
+
+//__________________
+PyTypeObject AthenaMP::ProcessProxy_Type = {
+   PyVarObject_HEAD_INIT( &PyType_Type, 0 )
+   (char*)"_athenamp.ProcessProxy", // tp_name
+   sizeof(ProcessProxy),      // tp_basicsize
+   0,                         // tp_itemsize
+   (destructor)pp_dealloc,    // tp_dealloc
+   0,                         // tp_print
+   0,                         // tp_getattr
+   0,                         // tp_setattr
+   0,                         // tp_compare
+   (reprfunc)pp_repr,         // tp_repr
+   0,                         // tp_as_number
+   0,                         // tp_as_sequence
+   0,                         // tp_as_mapping
+   0,                         // tp_hash
+   0,                         // tp_call
+   0,                         // tp_str
+   0,                         // tp_getattro
+   0,                         // tp_setattro
+   0,                         // tp_as_buffer
+   Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,      // tp_flags
+   (char*)"AthenaMP process proxy",              // tp_doc
+   (traverseproc)pp_traverse, // tp_traverse
+   (inquiry)pp_clear,         // tp_clear
+   0,                         // tp_richcompare
+   0,                         // tp_weaklistoffset
+   0,                         // tp_iter
+   0,                         // tp_iternext
+   0,                         // tp_methods
+   0,                         // tp_members
+   pp_getset,                 // tp_getset
+   0,                         // tp_base
+   0,                         // tp_dict
+   0,                         // tp_descr_get
+   0,                         // tp_descr_set
+   0,                         // tp_dictoffset
+   (initproc)pp_init,         // tp_init
+   0,                         // tp_alloc
+   (newfunc)pp_new,           // tp_new
+   0,                         // tp_free
+   0,                         // tp_is_gc
+   0,                         // tp_bases
+   0,                         // tp_mro
+   0,                         // tp_cache
+   0,                         // tp_subclasses
+   0                          // tp_weaklist
+#if PY_VERSION_HEX >= 0x02030000
+   , 0                        // tp_del
+#endif
+#if PY_VERSION_HEX >= 0x02060000
+   , 0                        // tp_version_tag
+#endif
+};

src/_athenamp/PyProcess.h

+#ifndef ATHENAMP_PY_PROCESS_H
+#define ATHENAMP_PY_PROCESS_H
+
+#include "AthenaMP/Process.h"
+
+
+namespace AthenaMP {
+
+struct ProcessProxy {
+   PyObject_HEAD
+   Process m_process;
+};
+
+
+//- process proxy type and type verification ---------------------------------
+extern PyTypeObject ProcessProxy_Type;
+
+template< typename T >
+inline bool ProcessProxy_Check( T* object )
+{
+   return object && PyObject_TypeCheck( object, &ProcessProxy_Type );
+}
+
+template< typename T >
+inline bool ProcessProxy_CheckExact( T* object )
+{
+   return object && Py_TYPE(object) == &ProcessProxy_Type;
+}
+
+//- creation -----------------------------------------------------------------
+ProcessProxy* ProcessProxy_New( pid_t pid );
+ProcessProxy* ProcessProxy_New( const Process& );
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_PY_PROCESS_H

src/_athenamp/PyProcessGroup.cxx

+#include "PyAthenaMP.h"
+#include "PyProcessGroup.h"
+#include "PyProcess.h"
+
+#include <string.h>
+
+#include <new>
+
+#include <marshal.h>
+
+
+//- module classes -----------------------------------------------------------
+AthenaMP::ProcessGroupProxy* AthenaMP::ProcessGroupProxy_New( int nprocs )
+{
+   ProcessGroupProxy* pygroup =
+      (ProcessGroupProxy*)ProcessGroupProxy_Type.tp_new( &ProcessGroupProxy_Type, 0, 0 );
+   new( (void*)&pygroup->m_group ) ProcessGroup( nprocs );
+   return pygroup;
+}
+
+
+using namespace AthenaMP;
+
+//__________________
+static ProcessGroupProxy* pg_new( PyTypeObject*, PyObject*, PyObject* )
+{
+   ProcessGroupProxy* pygroup = PyObject_GC_New( ProcessGroupProxy, &ProcessGroupProxy_Type );
+   PyObject_GC_Track( pygroup );
+   return pygroup;
+}
+
+//__________________
+static int pg_init( ProcessGroupProxy* self, PyObject* args, PyObject* kwds )
+{
+   int nprocs = -1;
+   static const char* kwlist[] = { "nprocs", NULL };
+   if ( ! PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:Process", (char**)kwlist, &nprocs ) ) {
+      new( (void*)&self->m_group ) ProcessGroup( 0 );
+      return -1;
+   }
+
+   new( (void*)&self->m_group ) ProcessGroup( nprocs );
+   return 0;
+}
+
+//__________________
+static int pg_traverse( ProcessGroupProxy*, visitproc, void* )
+{
+   return 0;
+}
+
+//__________________
+static int pg_clear( ProcessGroupProxy* )
+{
+   return 0;
+}
+
+//__________________
+static void pg_dealloc( ProcessGroupProxy* pygroup )
+{
+   PyObject_GC_UnTrack( pygroup );
+
+   pygroup->m_group.~ProcessGroup();
+
+   PyObject_GC_Del( pygroup );
+}
+
+
+//__________________
+static PyGetSetDef pg_getset[] = {
+   { (char*)NULL, NULL, NULL, NULL, NULL }
+};
+
+
+//__________________
+ScheduledWork* AthenaMP_MappedPyFunction( ScheduledWork* databuf ) {
+
+   PyObject* pydata = PyMarshal_ReadObjectFromString( (char*)databuf->data, databuf->size );
+   if ( ! pydata || ! PyTuple_Check( pydata ) ) {
+      PyErr_Print();
+      return 0;
+   }
+
+// one string entry in pydata is guaranteed by pg_map_async
+   std::string func = PyString_AsString( PyTuple_GET_ITEM( pydata, 0 ) );
+
+   PyObject* pymod = 0;
+   std::string::size_type pos = func.rfind( '.' );
+   if ( pos == std::string::npos ) {
+      pymod = PyImport_AddModule( "__main__" );
+      Py_XINCREF( pymod );
+      pos = 0;
+   } else {
+      pymod = PyImport_ImportModuleNoBlock( func.substr(0, pos).c_str() );
+      pos = pos + 1;
+   }
+
+   if ( ! pymod ) {
+      PyErr_Print();
+      Py_DECREF( pydata );
+      return 0;
+   }
+
+   PyObject* pyfunc = PyObject_GetAttrString( pymod, func.substr(pos, std::string::npos).c_str() );
+   if ( ! pyfunc ) {
+      PyErr_Print();
+      Py_DECREF( pymod );
+      Py_DECREF( pydata );
+      return 0;
+   }
+
+   if ( ! PyCallable_Check( pyfunc ) ) {
+      PyErr_Format( PyExc_TypeError,
+         "received non-callable %s", func.substr(pos, std::string::npos).c_str() );
+      PyErr_Print();
+      Py_DECREF( pyfunc );
+      Py_DECREF( pymod );
+      Py_DECREF( pydata );
+      return 0;
+   }
+
+   PyObject* pyargs = PyTuple_GetSlice( pydata, 1, PyTuple_GET_SIZE( pydata ) );
+   PyObject* pyresult = PyObject_Call( pyfunc, pyargs, NULL );
+   Py_DECREF( pyargs );
+
+   ScheduledWork* result = 0;
+   if ( pyresult ) {
+      PyObject* pybuf = PyMarshal_WriteObjectToString( pyresult, Py_MARSHAL_VERSION );
+      if ( pybuf ) {
+         result = (ScheduledWork*)malloc( sizeof( ScheduledWork ) );
+         result->data = malloc( PyString_GET_SIZE( pybuf ) );
+         memcpy( result->data, PyString_AS_STRING( pybuf ), PyString_GET_SIZE( pybuf ) );
+         result->size = PyString_GET_SIZE( pybuf );
+         Py_DECREF( pybuf );
+      } else {
+         PyErr_Print();
+      }