Anonymous avatar Anonymous committed 6b344d5

check StatusCode

Comments (0)

Files changed (18)

GaudiMP/IoComponentMgr.h

+///////////////////////// -*- C++ -*- /////////////////////////////
+// IoComponentMgr.h 
+// Header file for class IoComponentMgr
+// Author: S.Binet<binet@cern.ch>
+/////////////////////////////////////////////////////////////////// 
+#ifndef GAUDIMP_IOCOMPONENTMGR_H
+#define GAUDIMP_IOCOMPONENTMGR_H 1
+
+// Python includes
+#include <Python.h>
+
+// STL includes
+#include <string>
+#include <map>
+#include <list>
+
+// FrameWork includes
+#include "GaudiKernel/Service.h"
+
+// GaudiKernel
+#include "GaudiKernel/IIoComponent.h"
+#include "GaudiKernel/IIoComponentMgr.h"
+
+// Forward declaration
+class ISvcLocator;
+template <class TYPE> class SvcFactory;
+
+
+class IoComponentMgr: public extends1<Service, IIoComponentMgr> {
+
+  friend class SvcFactory<IoComponentMgr>;
+
+  /////////////////////////////////////////////////////////////////// 
+  // Public methods: 
+  /////////////////////////////////////////////////////////////////// 
+ public: 
+
+  // Copy constructor: 
+
+  /// Constructor with parameters: 
+  IoComponentMgr( const std::string& name, ISvcLocator* pSvcLocator );
+
+  /// Destructor: 
+  virtual ~IoComponentMgr(); 
+
+  // Assignment operator: 
+  //IoComponentMgr &operator=(const IoComponentMgr &alg); 
+
+  /// Gaudi Service Implementation
+  //@{
+  virtual StatusCode initialize();
+  virtual StatusCode finalize();
+
+  //@}
+
+  /////////////////////////////////////////////////////////////////// 
+  // Const methods: 
+  ///////////////////////////////////////////////////////////////////
+
+  /** @brief: check if the registry contains a given @c IIoComponent
+   */
+  virtual
+  bool io_hasitem (IIoComponent* iocomponent) const;
+
+  /** @brief: check if the registry contains a given @c IIoComponent and
+   *          that component had @param `fname` as a filename
+   */
+  virtual
+  bool io_contains (IIoComponent* iocomponent,
+		    const std::string& fname) const;
+
+  /////////////////////////////////////////////////////////////////// 
+  // Non-const methods: 
+  /////////////////////////////////////////////////////////////////// 
+
+  /** @brief: allow a @c IIoComponent to register itself with this
+   *          manager so appropriate actions can be taken when e.g.
+   *          a @c fork(2) has been issued (this is usually handled
+   *          by calling @c IIoComponent::io_reinit on every registered
+   *          component)
+   */
+  virtual
+  StatusCode io_register (IIoComponent* iocomponent);
+
+  /** @brief: allow a @c IIoComponent to register itself with this
+   *          manager so appropriate actions can be taken when e.g.
+   *          a @c fork(2) has been issued (this is usually handled
+   *          by calling @c IIoComponent::io_reinit on every registered
+   *          component)
+   */
+  virtual
+  StatusCode io_register (IIoComponent* iocomponent,
+			  IIoComponentMgr::IoMode::Type iomode,
+			  const std::string& fname);
+
+  /** @brief: retrieve the new filename for a given @c IIoComponent and
+   *          @param `fname` filename
+   */
+  virtual
+  StatusCode io_retrieve (IIoComponent* iocomponent,
+			  std::string& fname);
+
+  /** @brief: reinitialize the I/O subsystem.
+   *  This effectively calls @c IIoComponent::io_reinit on all the registered
+   *  @c IIoComponent.
+   */
+  virtual
+  StatusCode io_reinitialize ();
+
+  /** @brief: finalize the I/O subsystem.
+   *  Hook to allow to e.g. give a chance to I/O subsystems to merge output
+   *  files. Not sure how to do this correctly though...
+   */
+  virtual
+  StatusCode io_finalize ();
+
+  /////////////////////////////////////////////////////////////////// 
+  // Private data: 
+  /////////////////////////////////////////////////////////////////// 
+ private: 
+
+  /// Default constructor: 
+  IoComponentMgr();
+
+  mutable MsgStream m_log;
+
+  typedef std::map<std::string, IIoComponent*> IoRegistry_t;
+  /// Registry of @c IIoComponents
+  IoRegistry_t m_ioregistry;
+
+  typedef std::list<IIoComponent*> IoStack_t;
+  /// Stack of @c IIoComponents to properly handle order of registration
+  IoStack_t m_iostack;
+
+  /** pointer to a python dictionary holding the associations:
+   *    { 'iocomp-name' : { 'oldfname' : [ 'iomode', 'newfname' ] } }
+   */
+  PyObject *m_dict;
+
+  /// location of the python dictionary
+  std::string m_dict_location;
+
+}; 
+
+
+#endif //> !GAUDIMP_IOCOMPONENTMGR_H

GaudiMP/PyROOTPickle.h

+// This file's extension implies that it's C, but it's really -*- C++ -*-.
+
+/**
+ * @file GaudiMP/PyROOTPickle.h
+ * @author Wim Lavrijsen
+ * @date Apr 2008
+ * @brief Port pickling functionality while awaiting newer release.
+ *
+ * PyROOT does not implement direct pickling of ObjectProxy's until release
+ * 5.20 of ROOT; this code brings it forward for use in ATLAS.
+ *
+ * The code here will make the following changes to PyROOT's behavior:
+ *
+ *  - Add a method "_ObjectProxy__expand__" to libPyROOT that is stored by
+ *     name by the pickler used by the unpickler. The expand function takes
+ *     a class name and a python string, which are turned into an object
+ *     using TBufferFile.
+ *
+ *  - Add a "__reduce__" method to ObjectProxy, which turns ObjectProxy's
+ *     into python strings through serialization done by TBufferFile. The
+ *     python string is subsequently pickled.
+ */
+
+
+#ifndef GAUDIMP_PYROOTPICKLE_H
+#define GAUDIMP_PYROOTPICKLE_H
+
+#include "GaudiKernel/Kernel.h"
+
+#ifdef _POSIX_C_SOURCE
+# undef _POSIX_C_SOURCE
+#endif
+#include "Python.h"
+
+
+namespace GaudiMP {
+
+
+class GAUDI_API PyROOTPickle
+{
+public:
+  /**
+   * @brief Install the pickling of ObjectProxy's functionality.
+   * @param libpyroot_pymodule The libPyROOT python module
+   * @param objectproxy_pytype The ObjectProxy python type
+   */
+  static void Initialize (PyObject* libpyroot_pymodule,
+                          PyObject* objectproxy_pytype);
+};
+
+
+} // namespace GaudiMP
+
+
+#endif // not GAUDIMP_PYROOTPICKLE_H

GaudiMP/TESSerializer.h

+// $Id: TESSerializer.h,v 1.1 2008/10/17 08:56:05 mato Exp $
+#ifndef GAUDIMP_TESSERIALIZER_H
+#define GAUDIMP_TESSERIALIZER_H
+
+#include "GaudiKernel/IDataStoreAgent.h"
+#include "GaudiKernel/IAddressCreator.h"
+
+// ROOT includes
+#include "TClass.h"
+
+// vector and string
+#include <string>
+#include <vector>
+#include <map>
+
+// forward declarations
+class IDataProviderSvc;
+class IDataManagerSvc;
+class TBufferFile;
+class DataStoreItem;
+class DataObject;
+class IAddressCreator;
+
+/** A class to serialize/deserialize
+    TES objects to and from a TBufferFile
+    Author:  P. Mato
+    Author:  E. Smith
+    Version: 1.1
+*/
+
+namespace GaudiMP {
+  class GAUDI_API TESSerializer :  virtual public IDataStoreAgent {
+    typedef std::vector<DataStoreItem*> Items;
+    typedef std::vector<std::string>    ItemNames;
+    typedef std::vector<DataObject*>    Objects;
+  public:
+    /// Constructor
+    TESSerializer(IDataProviderSvc* svc, IAddressCreator* ac);
+
+    /// Dump TES contents listed in m_itemList/m_optItemList to a TBufferFile
+    void dumpBuffer(TBufferFile&);
+
+    /// Rebuild TES from items in a TBufferFile
+    void loadBuffer(TBufferFile&);
+
+    /// add an item to the TESSerializer's list (#notation)
+    void addItem(const std::string& path);
+
+    /// add an item to the TESSerializer's optional list (#notation)
+    void addOptItem(const std::string& path);
+
+    /// Analysis callback
+    bool analyse(IRegistry* dir, int level);
+
+    /// print out the contents of m_itemList and m_optItemList (std::cout)
+    void checkItems( );
+
+  protected:
+    /// Add item to the list of items to be serialized (#notation)
+    void addItem(Items& itms, const std::string& descriptor);
+
+    /// Find single item identified by its path (exact match)
+    DataStoreItem* findItem(const std::string& path);
+
+  private:
+     /// TES pointer
+    IDataProviderSvc* m_TES;
+     /// TES pointer
+    IDataManagerSvc*  m_TESMgr;
+    /// Vector of item names
+    ItemNames         m_itemNames;
+    /// Vector of items to be saved to this stream (DataStoreItem ptrs)
+    Items             m_itemList;
+    /// Vector of item names (std::strings)
+    ItemNames         m_optItemNames;
+    /// Vector of optional items to be saved to this stream (DataStoreItem ptrs)
+    Items             m_optItemList;
+    /// Current item while traversing the TES tree
+    DataStoreItem*    m_currentItem;
+    /// Selected list of Objects to be serialized (DataObject ptrs)
+    Objects           m_objects;
+
+    /// Map of gROOT class information
+    std::map<std::string, TClass*> m_classMap;
+    /// Boolean Flag as used by GaudiSvc/PersistencySvc/OutputStreamer
+    bool                 m_verifyItems;
+    /// Boolean Flag used to determine error tolerance
+    bool                 m_strict;
+    /// IAddress Creator for Opaque Addresses
+    IAddressCreator*     m_addressCreator;
+  };
+}
+#endif
+
+
+package GaudiMP
+version v1r1
+
+branches      GaudiMP cmt doc src dict python scripts
+
+use GaudiKernel        *
+use GaudiPython        * -no_auto_imports
+
+use Python             *  LCG_Interfaces -no_auto_imports
+use Reflex             *  LCG_Interfaces -no_auto_imports
+use ROOT               *  LCG_Interfaces -no_auto_imports
+
+apply_pattern install_more_includes more=GaudiMP
+apply_pattern install_python_modules
+apply_pattern install_scripts
+
+
+#-------Applications and Libraries-----------------------------------------------
+library GaudiMP   Lib/*.cpp      -import=Python -import=Reflex -import=ROOT -no_static
+apply_pattern  linker_library    library=GaudiMP
+
+apply_pattern  component_library library=GaudiMP
+
+
+#-------LCG Dictionaries---------------------------------------------------------
+apply_pattern reflex_dictionary dictionary=GaudiMP \
+                                headerfiles=$(GAUDIMPROOT)/dict/gaudimp_dict.h \
+                                selectionfile=../dict/selection.xml \
+                                options="--no_templatetypedefs -I$(Python_inc)" \
+                                imports="Python -import=Reflex -import=ROOT"
+
+#--------------------------------------------------------------------------------
+private
+
+macro GaudiMP_PyROOT_linkopts " -L$(ROOT_home)/lib -lPyROOT"
+macro_append GaudiMP_linkopts " $(GaudiMP_PyROOT_linkopts)"
+
+#--------------------------------------------------------------------------------
+
+# The rootmap name we have to use depends on the version of ROOT: ROOT 5.14 (LCG <= 53*) and ROOT 5.15 (LCG >= 54)
+macro rootmap_name "rootmap" ROOT_GE_5_15 "reflex.rootmap"
+
+# macro test_GPyTest_rootmap   $(GAUDIPYTHONROOT)/$(tag)/$(rootmap_name) \
+#       GAUDI_with_installarea $(CMTINSTALLAREA)/$(tag)/lib/$(rootmap_name) \
+#       GAUDIATLAS_with_installarea $(CMTINSTALLAREA)/$(tag)/lib/$(rootmap_name)
+
+end_private

dict/gaudimp_dict.h

+// $Id:
+// ============================================================================
+// CVS tag $Name:  $, version $Revision: 1.37 $
+// ============================================================================
+#include "GaudiKernel/Algorithm.h"
+#include "GaudiKernel/ParticleProperty.h"
+#include "GaudiKernel/Property.h"
+#include "GaudiKernel/PropertyCallbackFunctor.h"
+#include "GaudiKernel/Chrono.h"
+#include "GaudiKernel/ChronoEntity.h"
+#include "GaudiKernel/Stat.h"
+#include "GaudiKernel/StatEntity.h"
+#include "GaudiKernel/SerializeSTL.h"
+#include "GaudiKernel/StringKey.h"
+#include "GaudiKernel/Range.h"
+
+#ifdef _WIN32
+#include "GaudiKernel/GaudiHandle.h"
+#endif
+
+#ifdef __ICC
+// disable icc remark #177: declared but never referenced
+#pragma warning(disable:177)
+// disable icc warning #1125: function "C::X()" is hidden by "Y::X" -- virtual function override intended?
+#pragma warning(disable:1125)
+#endif
+
+#include "GaudiMP/PyROOTPickle.h"
+#include "GaudiMP/TESSerializer.h"
+
+#ifdef _WIN32
+#pragma warning ( disable : 4345 )
+#pragma warning ( disable : 4624 )
+#endif
+
+#ifdef __ICC
+// disable icc warning #191: type qualifier is meaningless on cast type
+// ... a lot of noise produced by the dictionary
+#pragma warning(disable:191)
+#endif
+
+// ============================================================================
+// The END
+// ============================================================================

dict/selection.xml

+<lcgdict>
+
+  <class name    = "GaudiMP::PyROOTPickle"/>
+  <class name    = "GaudiMP::TESSerializer"/>
+
+</lcgdict>

doc/release.notes

+Package: GaudiMP
+Package manager : Sebastien Binet
+
+================================ GaudiMP-01-01-04 ============================
+! 2011-06-02 - Charles Leggett
+ - check StatusCode of Service::initialize
+M       src/Lib/IoComponentMgr.cpp
+
+================================ GaudiMP-01-01-03 ============================
+! 2011-05-04 - Charles Leggett
+ - check for python initialization, add IoRegistry.py and FdsRegistry.py 
+   modules
+A       python/GaudiMP/IoRegistry.py
+A       python/GaudiMP/FdsRegistry.py
+M       cmt/requirements
+M       src/Lib/IoComponentMgr.cpp
+
+
+================================ GaudiMP-01-01-02 ============================
+! 2011-05-03 - Charles Leggett
+ - made GaudiMP a component library too, added some debug output to 
+   IoComponentMgr
+M       cmt/requirements
+M       GaudiMP/IoComponentMgr.h
+M       src/Lib/IoComponentMgr.cpp
+
+================================ GaudiMP-01-01-01 ============================
+! 2011-05-03 - Charles Leggett
+ - Add IIoComponentMgr
+M       cmt/requirements
+A       GaudiMP/IoComponentMgr.h
+A       src/Lib/IoComponentMgr.cpp
+
+================================ GaudiMP v1r1 ================================
+! 2011-04-04 - Pere Mato
+ - Fixes to work with LHCb Brunel
+   1) Namespace changed from GaudiPython to GaudiMP for TESSerializer class
+   2) Firing "EndEvent" incidents in Worker event loop. This was needed to
+      eliminate crashes in Brunel when invalid pointer to collection of
+      cluster was used in fitting algorithms.
+   3) Call update() method after having de-serialize a KeyedContainer<>. This
+      is needed to re-build the internal direct access container.
+
+============================== GaudiMP v1r0 ==================================
+! 2011-01-11 - Marco Clemencic
+ - Fixed some ICC remarks.
+
+! 2010-12-17 - Eoin Smith
+  - Updated TESSerializer.h/cpp, python/GaudiPython for Multicore Execution
+    TESSerializer class can now serialize/reconstruct Opaque Addresses using
+    the GaudiKernel IOpaqueAddress interface.
+    TESSerializer now uses a map as member variable for TClass names, to
+    improve performance (initialize once, refer back on each event iteration)
+    Multicore components collected in GMPBase.py ; the Reader, Worker and
+    Writer.  pTools.py contains extra classes; HistoAgent, FileRecordsAgent
+    for communication and consolidation of Histo and Records stores. pTools
+    also contains some extra methods used by the Reader, Writer, Workers.
+
+! 2010-12-16 - Sebastien Binet
+ - Patch #4074: migrate GaudiPython.Parallel to multiprocessing
+   First import

python/GaudiMP/FdsRegistry.py

+# @file GaudiMP.FdsRegistry.py
+# @purpose file descriptor registration and handling
+# @author Mous Tatarkhanov <tmmous@berkeley.edu>
+
+_O_ACCMODE = 3  #access-mode check for file flags.
+
+import logging
+msg = logging.getLogger( 'FdsRegistry' )
+
+class FdsDict(dict):
+    name = "fds_dict"
+    curdir = None
+    
+    def __missing__(self, key):
+        self[key] = ""
+        return ""
+
+    def fname(self,i):
+        if i in self:
+            return self[i][0]
+        else:
+            msg.warning ("fds_dict:fname: No Key %s" % i)
+            return ""
+
+    def fds(self, fname):
+        return [i for i, v in self.iteritems() if v[0]==fname]
+
+    def has_name(self, fname):
+        for v in self.values():
+            if (v[0] == fname):
+                return True
+        return False
+    
+    # i - is the fd index (int)
+    def add(self, i, fname, iomode, flags):
+        self[i] = (fname, iomode, flags)
+        return 
+
+    def iomode(self,i):
+        if i in self:
+            return self[i][1]
+        else:
+            msg.warning ("fds_dict:iomode: No Key %s" % i)
+            return ""
+    
+    def get_output_fds(self):
+        return [i for i in self.keys() if self[i][1]=='<OUTPUT>']
+        
+    def get_input_fds(self):
+        return [i for i in self.keys() if self[i][1]=='<INPUT>']
+    
+    def get_fds_in_dir(self, dir=""):
+        import os
+        if dir == "" and self.curdir is not None:
+            dir = self.curdir
+        msg.debug("get_fds_in_dir(%s)" % dir)
+        return [i for i in self.keys() 
+                if os.path.samefile(os.path.dirname(self[i][0]), dir) ]
+    
+    def create_symlinks(self, wkdir=""):
+        """
+        create necessary symlinks in worker's dir if the fd is <INPUT>
+        otherwise copy <OUTPUT> file 
+        """
+        import os,shutil
+        msg.info("create_symlinks: %s" % self.get_fds_in_dir()) 
+        #some files expected to be in curdir
+        for fd in self.get_fds_in_dir():   
+            src = self[fd][0]
+            iomode = self[fd][1]
+            dst = os.path.join(wkdir, os.path.basename(src))
+            if iomode == "<INPUT>":
+                if os.path.exists(dst):
+                    # update_io_registry took care of this
+                    msg.debug("fds_dict.create_symlink:update_io_registry took care of src=%s" % src)
+                    pass
+                else:
+                    msg.debug("fds_dict.create_symlink:(symlink) src=%s, iomode=%s" % (src,iomode))
+                    os.symlink(src, dst)
+            else:
+                msg.debug("fds_dict.create_symlink: (copy) src=%s, dst=%s" % (src, dst))
+                shutil.copy(src, dst)
+                pass
+        return
+    
+    def extract_fds(self, dir=""):
+        """parse the fds of the processs -> build fds_dict
+        """
+        import os, fcntl
+        msg.info("extract_fds: making snapshot of parent process file descriptors")
+        self.curdir = os.path.abspath(os.curdir)
+        iomode = '<INPUT>'
+
+        procfd = '/proc/self/fd'
+        fds = os.listdir(procfd)
+        for i in fds:
+            fd = int(i)
+            if (fd==1 or fd==2):
+                #leave stdout and stderr to redirect_log
+                continue
+            realname = os.path.realpath(os.path.join(procfd,i))
+            if os.path.exists(realname):
+                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+                if (flags & _O_ACCMODE) == 0: #read-only --> <INPUT>
+                    iomode = "<INPUT>"
+                else:
+                    iomode = "<OUTPUT>"
+                
+                self.add(fd, realname, iomode, flags)
+                   
+        msg.debug( "extract_fds.fds_dict=%s" % self)
+        return
+        
+    pass #FdsDict
+

python/GaudiMP/GMPBase.py

+from Gaudi.Configuration import *
+from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE
+from ROOT import TBufferFile, TBuffer
+from multiprocessing import Process, Queue, JoinableQueue, Event
+from multiprocessing import cpu_count, current_process
+from multiprocessing.queues import Empty
+from pTools import *
+import time, sys, os
+
+# This script contains the bases for the Gaudi MultiProcessing (GMP)
+# classes
+
+# There are three classes :
+#   Reader
+#   Worker
+#   Writer
+
+# Each class needs to perform communication with the others
+# For this, we need a means of communication, which will be based on
+# the python multiprocessing package
+# This is provided in SPI pytools package
+# cmt line : use pytools v1.1 LCG_Interfaces
+# The PYTHONPATH env variable may need to be modified, as this might
+# still point to 1.0_python2.5
+
+# Each class will need Queues, and a defined method for using these
+# queues.
+# For example, as long as there is something in the Queue, both ends
+# of the queue must be open
+# Also, there needs to be proper Termination flags and criteria
+# The System should be error proof.
+
+
+# Constants -------------------------------------------------------------------
+NAP = 0.001
+MB  = 1024.0*1024.0
+# waits to guard against hanging, in seconds
+WAIT_INITIALISE   = 60*5
+WAIT_FIRST_EVENT  = 60*3
+WAIT_SINGLE_EVENT = 60*3
+WAIT_FINALISE     = 60*2
+STEP_INITIALISE   = 10
+STEP_EVENT        = 2
+STEP_FINALISE     = 5
+
+# My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
+SMAPS = False
+
+# -----------------------------------------------------------------------------
+
+# definitions
+# ----------
+# used to convert stored histos (in AIDA format) to ROOT format
+aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
+
+# used to check which type of histo we are dealing with
+# i.e. if currentHisto in aidatypes : pass
+aidatypes = ( gbl.AIDA.IHistogram,
+              gbl.AIDA.IHistogram1D,
+              gbl.AIDA.IHistogram2D,
+              gbl.AIDA.IHistogram3D,
+              gbl.AIDA.IProfile1D,
+              gbl.AIDA.IProfile2D,
+              gbl.AIDA.IBaseHistogram  )  # extra?
+
+# similar to aidatypes
+thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
+
+# Types of OutputStream in Gaudi
+WRITERTYPES  = {  'EvtCollectionStream'    : "tuples",
+                  'InputCopyStream'        : "events",
+                  'OutputStream'           : "events",
+                  'RecordStream'           : "records",
+                  'RunRecordStream'        : "records",
+                  'SequentialOutputStream' : "events",
+                  'TagCollectionStream'    : "tuples"   }
+
+# =============================================================================
+
+class MiniWriter( object ) :
+    '''
+    A class to represent a writer in the GaudiPython configuration
+    It can be non-trivial to access the name of the output file; it may be
+    specified in the DataSvc, or just on the writer, may be a list, or string
+    Also, there are three different types of writer (events, records, tuples)
+    so this bootstrap class provides easy access to this info while configuring
+    '''
+    def __init__( self, writer, wType, config ) :
+        self.w     = writer
+        self.wType = wType
+        # set parameters for directly accessing the correct
+        #   part of the configuration, so that later, we can do
+        #   config[ key ].Output = modified(output)
+        self.key          = None
+        self.output       = None
+        self.ItemList     = None
+        self.OptItemList  = None
+        #
+        self.wName = writer.getName()
+        # Now process the Writer, find where the output is named
+        self.woutput     = None
+        self.datasvcName = None
+        self.svcOutput   = None
+        if hasattr( self.w, "Output" ) :
+            self.woutput = self.w.Output
+            self.getItemLists( config )
+            self.set( self.wName, self.w.Output )
+            return
+        else :
+            # if there is no output file, get it via the datasvc
+            # (every writer always has a datasvc property)
+            self.datasvcName = self.w.EvtDataSvc
+            datasvc = config[ self.datasvcName ]
+            if hasattr( datasvc, "Output" ) :
+                self.getItemLists( config )
+                self.set( self.datasvcName, datasvc.Output )
+                return
+
+    def getNewName( self, replaceThis, withThis, extra='' ) :
+        # replace one pattern in the output name string
+        #  with another, and return the Output name
+        # It *might* be in a list, so check for this
+        #
+        # @param extra : might need to add ROOT flags
+        #                i.e.: OPT='RECREATE', or such
+        assert replaceThis.__class__.__name__ == 'str'
+        assert    withThis.__class__.__name__ == 'str'
+        old = self.output
+        lst = False
+        if old.__class__.__name__ == 'list' :
+            old = self.output[0]
+            lst = True
+        new = old.replace( replaceThis, withThis )
+        new += extra
+        if lst :
+            return [ new ]
+        else :
+            return new
+
+    def getItemLists( self, config ) :
+        # the item list
+        if hasattr( self.w, "ItemList" ) :
+            self.ItemList = self.w.ItemList
+        else :
+            datasvc = config[ self.w.EvtDataSvc ]
+            if hasattr( datasvc, "ItemList" ) :
+                self.ItemList = datasvc.ItemList
+        # The option item list; possibly not a valid variable
+        if hasattr( self.w, "OptItemList" ) :
+            self.OptItemList = self.w.OptItemList
+        else :
+            datasvc = config[ self.w.EvtDataSvc ]
+            if hasattr( datasvc, "OptItemList" ) :
+                self.OptItemList = datasvc.OptItemList
+        return
+
+    def set( self, key, output ) :
+        self.key         = key
+        self.output      = output
+        return
+
+    def __repr__( self ) :
+        s  = ""
+        line = '-'*80
+        s += (line+'\n')
+        s += "Writer         : %s\n"%( self.wName  )
+        s += "Writer Type    : %s\n"%( self.wType  )
+        s += "Writer Output  : %s\n"%( self.output )
+        s += "DataSvc        : %s\n"%( self.datasvcName )
+        s += "DataSvc Output : %s\n"%( self.svcOutput   )
+        s += '\n'
+        s += "Key for config : %s\n"%( self.key    )
+        s += "Output File    : %s\n"%( self.output )
+        s += "ItemList       : %s\n"%( self.ItemList )
+        s += "OptItemList    : %s\n"%( self.OptItemList )
+        s += (line+'\n')
+        return s
+
+# =============================================================================
+
+class CollectHistograms( PyAlgorithm ) :
+    '''
+    GaudiPython algorithm used to clean up histos on the Reader and Workers
+    Only has a finalize method()
+    This retrieves a dictionary of path:histo objects and sends it to the
+    writer.  It then waits for a None flag : THIS IS IMPORTANT, as if
+    the algorithm returns before ALL histos have been COMPLETELY RECEIVED
+    at the writer end, there will be an error.
+    '''
+    def __init__( self, gmpcomponent ) :
+        PyAlgorithm.__init__( self )
+        self._gmpc = gmpcomponent
+        self.log = self._gmpc.log
+        return None
+    def execute( self ) :
+        return SUCCESS
+    def finalize( self ) :
+        self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
+        self._gmpc.hDict = self._gmpc.dumpHistograms( )
+        ks = self._gmpc.hDict.keys()
+        self.log.info('%i Objects in Histogram Store'%(len(ks)))
+        # crashes occurred due to Memory Error during the sending of hundreds
+        # histos on slc5 machines, so instead, break into chunks
+        # send 100 at a time
+        chunk = 100
+        reps = len(ks)/chunk + 1
+        for i in xrange(reps) :
+            someKeys = ks[i*chunk : (i+1)*chunk]
+            smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
+            self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
+        # "finished" Notification
+        self.log.debug('Signalling end of histos to Writer')
+        self._gmpc.hq.put( 'HISTOS_SENT' )
+        self.log.debug( 'Waiting on Sync Event' )
+        self._gmpc.sEvent.wait()
+        self.log.debug( 'Histo Sync Event set, clearing and returning' )
+        self._gmpc.hvt.clearStore()
+        root = gbl.DataObject()
+        setOwnership(root, False)
+        self._gmpc.hvt.setRoot( '/stat', root )
+        return SUCCESS
+
+# =============================================================================
+
+class EventCommunicator( object ) :
+    # This class is responsible for communicating Gaudi Events via Queues
+    # Events are communicated as TBufferFiles, filled either by the
+    # TESSerializer, or the GaudiSvc, "IPCSvc"
+    def __init__( self, GMPComponent, qin, qout ) :
+        self._gmpc = GMPComponent
+        self.log   = self._gmpc.log
+        # maximum capacity of Queues
+        self.maxsize = 50
+        # central variables : Queues
+        self.qin  = qin
+        self.qout = qout
+
+        # flags
+        self.allsent = False
+        self.allrecv = False
+
+        # statistics storage
+        self.nSent    = 0    # counter : items sent
+        self.nRecv    = 0    # counter : items received
+        self.sizeSent = 0    # measure : size of events sent ( tbuf.Length() )
+        self.sizeRecv = 0    # measure : size of events in   ( tbuf.Length() )
+        self.qinTime  = 0    # time    : receiving from self.qin
+        self.qoutTime = 0    # time    : sending on qout
+
+    def send( self, item ) :
+        # This class manages the sending of a TBufferFile Event to a Queue
+        # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
+        assert item.__class__.__name__ == 'tuple'
+        startTransmission = time.time()
+        self.qout.put( item )
+        # allow the background thread to feed the Queue; not 100% guaranteed to
+        # finish before next line
+        while self.qout._buffer : time.sleep( NAP )
+        self.qoutTime += time.time()-startTransmission
+        self.sizeSent += item[1].Length()
+        self.nSent += 1
+        return SUCCESS
+
+    def receive( self, timeout=None ) :
+        # Receive items from self.qin
+        startWait = time.time()
+        try :
+            itemIn = self.qin.get( timeout=timeout )
+        except Empty :
+            return None
+        self.qinTime += time.time()-startWait
+        self.nRecv += 1
+        if itemIn.__class__.__name__ == 'tuple' :
+            self.sizeRecv += itemIn[1].Length()
+        else :
+            self.nRecv -= 1
+        try :
+            self.qin.task_done()
+        except :
+            self._gmpc.log.warning('TASK_DONE called too often by : %s'\
+                                    %(self._gmpc.nodeType))
+        return itemIn
+
+    def finalize( self ) :
+        self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
+        # Reader sends one flag for each worker
+        # Workers send one flag each
+        # Writer sends nothing (it's at the end of the chain)
+        if   self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
+        elif self._gmpc.nodeType == 'Writer' : downstream = 0
+        elif self._gmpc.nodeType == 'Worker' : downstream = 1
+        for i in xrange(downstream) :
+            self.qout.put( 'FINISHED' )
+        if self._gmpc.nodeType != 'Writer' :
+            self.qout.join()
+        # Now some reporting...
+        self.statistics( )
+
+    def statistics( self ) :
+        self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
+        self.log.info ( 'Items Sent     : %i'%(self.nSent) )
+        self.log.info ( 'Items Received : %i'%(self.nRecv) )
+        self.log.info ( 'Data  Sent     : %i'%(self.sizeSent) )
+        self.log.info ( 'Data  Received : %i'%(self.sizeRecv) )
+        self.log.info ( 'Q-out Time     : %5.2f'%(self.qoutTime) )
+        self.log.info ( 'Q-in  Time     : %5.2f'%(self.qinTime ) )
+
+# =============================================================================
+
+class TESSerializer( object ) :
+    def __init__( self, gaudiTESSerializer, evtDataSvc,
+                        nodeType, nodeID, log ) :
+        self.T   = gaudiTESSerializer
+        self.evt = evtDataSvc
+        self.buffersIn  = []
+        self.buffersOut = []
+        self.nIn     = 0
+        self.nOut    = 0
+        self.tDump   = 0.0
+        self.tLoad   = 0.0
+        # logging
+        self.nodeType = nodeType
+        self.nodeID   = nodeID
+        self.log      = log
+    def Load( self, tbuf ) :
+        root = gbl.DataObject()
+        setOwnership( root, False )
+        self.evt.setRoot( '/Event', root )
+        t = time.time()
+        self.T.loadBuffer( tbuf )
+        self.tLoad   += (time.time() - t)
+        self.nIn     += 1
+        self.buffersIn.append( tbuf.Length() )
+    def Dump( self ) :
+        t = time.time()
+        tb = TBufferFile( TBuffer.kWrite )
+        self.T.dumpBuffer(tb)
+        self.tDump += ( time.time()-t )
+        self.nOut  += 1
+        self.buffersOut.append( tb.Length() )
+        return tb
+    def Report( self ) :
+        evIn       = "Events Loaded    : %i"%( self.nIn  )
+        evOut      = "Events Dumped    : %i"%( self.nOut )
+        din = sum( self.buffersIn )
+        dataIn     = "Data Loaded      : %i"%(din)
+        dataInMb   = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
+        if self.nIn :
+            avgIn      = "Avg Buf Loaded   : %5.2f Mb"\
+                          %( din/(self.nIn*MB) )
+            maxIn      = "Max Buf Loaded   : %5.2f Mb"\
+                          %( max(self.buffersIn)/MB )
+        else :
+            avgIn      = "Avg Buf Loaded   : N/A"
+            maxIn      = "Max Buf Loaded   : N/A"
+        dout = sum( self.buffersOut )
+        dataOut    = "Data Dumped      : %i"%(dout)
+        dataOutMb  = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
+        if self.nOut :
+            avgOut     = "Avg Buf Dumped   : %5.2f Mb"\
+                          %( din/(self.nOut*MB) )
+            maxOut     = "Max Buf Dumped   : %5.2f Mb"\
+                          %( max(self.buffersOut)/MB )
+        else :
+            avgOut     = "Avg Buf Dumped   : N/A"
+            maxOut     = "Max Buf Dumped   : N/A"
+        dumpTime   = "Total Dump Time  : %5.2f"%( self.tDump )
+        loadTime   = "Total Load Time  : %5.2f"%( self.tLoad )
+
+        lines =  evIn     ,\
+                 evOut    ,\
+                 dataIn   ,\
+                 dataInMb ,\
+                 avgIn    ,\
+                 maxIn    ,\
+                 dataOut  ,\
+                 dataOutMb,\
+                 avgOut   ,\
+                 maxOut   ,\
+                 dumpTime ,\
+                 loadTime
+        self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
+        for line in lines :
+            self.log.info( line )
+        self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
+
+# =============================================================================
+
+class GMPComponent( object ) :
+    # This class will be the template for Reader, Worker and Writer
+    # containing all common components
+    # nodeId will be a numerical identifier for the node
+    # -1 for reader
+    # -2 for writer
+    # 0,...,nWorkers-1 for the Workers
+    def __init__( self, nodeType, nodeID, queues, events, params  ) :
+        # declare a Gaudi MultiProcessing Node
+        # the nodeType is going to be one of Reader, Worker, Writer
+        # qPair is going to be a tuple of ( qin, qout )
+        # for sending and receiving
+        # if nodeType is "Writer", it will be a list of qPairs,
+        # as there's one queue-in from each Worker
+        #
+        # params is a tuple of (nWorkers, config, log)
+
+        self.nodeType = nodeType
+        current_process().name = nodeType
+
+        # Synchronisation Event() objects for keeping track of the system
+        self.initEvent, eventLoopSyncer, self.finalEvent = events
+        self.eventLoopSyncer, self.lastEvent = eventLoopSyncer   # unpack tuple
+
+        # necessary for knowledge of the system
+        self.nWorkers, self.sEvent, self.config, self.log = params
+        self.nodeID   = nodeID
+
+        # describe the state of the node by the current Event Number
+        self.currentEvent = None
+
+        # Unpack the Queues : (events, histos, filerecords)
+        qPair, histq, fq = queues
+
+        # Set up the Queue Mechanisms ( Event Communicators )
+        if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
+            # Reader or Worker Node
+            qin, qout = qPair
+            self.evcom = EventCommunicator( self, qin, qout )
+        else :
+            # Writer : many queues in, no queue out
+            assert self.nodeType == 'Writer'
+            self.evcoms = []
+            qsin = qPair[0]
+            for q in qsin :
+                ec = EventCommunicator( self, q, None )
+                self.evcoms.append( ec )
+        # Histogram Queue
+        self.hq = histq
+        # FileRecords Queue
+        self.fq = fq
+
+        # Universal Counters (available to all nodes)
+        # Use sensibly!!!
+        self.nIn  = 0
+        self.nOut = 0
+
+        # Status Flag (possibly remove later)
+        self.stat = SUCCESS
+
+        # Set logger name
+        self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
+
+        # Heuristic variables
+        # time for init, run, final, firstEventTime, totalTime
+        self.iTime       = 0.0
+        self.rTime       = 0.0
+        self.fTime       = 0.0
+        self.firstEvTime = 0.0
+        self.tTime       = 0.0
+
+        # define the separate process
+        self.proc = Process( target=self.Engine )
+
+    def Start( self ) :
+        # Fork and start the separate process
+        self.proc.start()
+
+    def Engine( self ) :
+        # This will be the method carried out by the Node
+        # Different for all
+        pass
+
+    def processConfiguration( self ) :
+        # Different for all ; customize Configuration for multicore
+        pass
+
+    def SetupGaudiPython( self ) :
+        # This method will initialize the GaudiPython Tools
+        # such as the AppMgr and so on
+        self.a   = AppMgr()
+        if SMAPS :
+            from AlgSmapShot import SmapShot
+            smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
+            ss = SmapShot( logname=smapsLog )
+            self.a.addAlgorithm( ss )
+        self.evt = self.a.evtsvc()
+        self.hvt = self.a.histsvc()
+        self.fsr = self.a.filerecordsvc()
+        self.inc = self.a.service('IncidentSvc','IIncidentSvc')
+        self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
+        self.ts  = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
+        self.TS  = TESSerializer( self.ts, self.evt,
+                                  self.nodeType, self.nodeID, self.log )
+        return SUCCESS
+
+    def StartGaudiPython( self ) :
+        self.a.initialize()
+        self.a.start()
+        return SUCCESS
+
+    def LoadTES( self, tbufferfile ) :
+        root = gbl.DataObject()
+        setOwnership(root, False)
+        self.evt.setRoot( '/Event', root )
+        self.ts.loadBuffer(tbufferfile)
+
+    def getEventNumber( self ) :
+        # Using getList or getHistoNames can result in the EventSelector
+        # re-initialising connection to RootDBase, which costs a lot of
+        # time... try to build a set of Header paths??
+
+        # First Attempt : Unpacked Event Data
+        lst = [ '/Event/Gen/Header',
+                '/Event/Rec/Header' ]
+        for l in lst :
+            path = l
+            try :
+                n = self.evt[path].evtNumber()
+                return n
+            except :
+                # No evt number at this path
+                continue
+
+        # second attepmt : try DAQ/RawEvent data
+        # The Evt Number is in bank type 16, bank 0, data pt 4
+        try :
+            n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
+            return n
+        except :
+            pass
+
+        # Default Action
+        if self.nIn > 0 or self.nOut > 0 :
+            pass
+        else :
+            self.log.warning('Could not determine Event Number')
+        return -1
+
+    def IdentifyWriters( self ) :
+        #
+        # Identify Writers in the Configuration
+        #
+        d = {}
+        keys = [ "events", "records", "tuples", "histos" ]
+        for k in keys :
+            d[k] = []
+
+        # Identify Writers and Classify
+        wkeys = WRITERTYPES.keys()
+        for v in self.config.values() :
+            if v.__class__.__name__ in wkeys :
+                writerType = WRITERTYPES[ v.__class__.__name__ ]
+                d[writerType].append( MiniWriter(v, writerType, self.config) )
+                if self.nodeID == 0 :
+                    self.log.info('Writer Found : %s'%(v.name()))
+
+        # Now Check for the Histogram Service
+        if 'HistogramPersistencySvc' in  self.config.keys() :
+            hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
+            d[ "histos" ].append( hfile )
+        return d
+
+    def dumpHistograms( self ) :
+        '''
+        Method used by the GaudiPython algorithm CollectHistos
+        to obtain a dictionary of form { path : object }
+        representing the Histogram Store
+        '''
+        nlist = self.hvt.getHistoNames( )
+        histDict = {}
+        objects = 0 ; histos = 0
+        if nlist :
+            for n in nlist :
+                o = self.hvt[ n ]
+                if type(o) in aidatypes :
+                    o = aida2root(o)
+                    histos  += 1
+                else :
+                    objects += 1
+                histDict[ n ] = o
+        else :
+            print 'WARNING : no histograms to recover?'
+        return histDict
+
+    def Initialize( self ) :
+        start = time.time()
+        self.processConfiguration( )
+        self.SetupGaudiPython( )
+        # Set the initialisation flag!
+        self.initEvent.set()
+        self.StartGaudiPython( )
+        self.iTime = time.time() - start
+
+    def Finalize( self ) :
+        start = time.time()
+        self.a.stop()
+        self.a.finalize()
+        self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
+        self.finalEvent.set()
+        self.fTime = time.time() - start
+
+    def Report( self ) :
+        self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
+        allTime  = "Alive Time     : %5.2f"%(self.tTime)
+        initTime = "Init Time      : %5.2f"%(self.iTime)
+        frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
+        runTime  = "Run Time       : %5.2f"%(self.rTime)
+        finTime  = "Finalise Time  : %5.2f"%(self.fTime)
+        tup = ( allTime, initTime, frstTime, runTime, finTime )
+        for t in tup :
+            self.log.info( t )
+        self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
+        # and report from the TESSerializer
+        self.TS.Report()
+
+# =============================================================================
+
+class Reader( GMPComponent )  :
+    def __init__( self, queues, events, params ) :
+        GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
+
+    def processConfiguration( self ) :
+        # Reader :
+        #   No algorithms
+        #   No output
+        #   No histos
+        self.config[ 'ApplicationMgr' ].TopAlg    = []
+        self.config[ 'ApplicationMgr' ].OutStream = []
+        if "HistogramPersistencySvc" in self.config.keys() :
+            self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
+        self.config['MessageSvc'].Format    = '[Reader]% F%18W%S%7W%R%T %0W%M'
+        self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
+
+    def DumpEvent( self ) :
+        tb = TBufferFile( TBuffer.kWrite )
+        # print '----Reader dumping Buffer!!!'
+        self.ts.dumpBuffer( tb )
+        # print '\tBuffer Dumped, size : %i'%( tb.Length() )
+        return tb
+
+    def DoFirstEvent( self ) :
+        # Do First Event ------------------------------------------------------
+        # Check Termination Criteria
+        startFirst = time.time()
+        self.log.info('Reader : First Event')
+        if self.nOut == self.evtMax :
+            self.log.info('evtMax( %i ) reached'%(self.evtMax))
+            self.lastEvent.set()
+            return SUCCESS
+        else :
+            # Continue to read, dump and send event
+            self.a.run(1)
+            if not bool(self.evt['/Event']) :
+                self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
+                self.lastEvent.set()
+                return SUCCESS
+            else :
+                # Popluate TESSerializer list and send Event
+                try :
+                    lst = self.evt.getList()
+                except :
+                    self.log.critical('Reader could not acquire TES List!')
+                    self.lastEvent.set()
+                    return FAILURE
+                self.log.info('Reader : TES List : %i items'%(len(lst)))
+                for l in lst :
+                    self.ts.addItem(l)
+                self.currentEvent = self.getEventNumber( )
+                tb = self.TS.Dump( )
+                self.log.info('First Event Sent')
+                self.evcom.send( (self.currentEvent, tb) )
+                self.nOut += 1
+                self.eventLoopSyncer.set()
+                self.evt.clearStore( )
+                self.firstEvTime = time.time()-startFirst
+                return SUCCESS
+
+    def Engine( self ) :
+
+        startEngine = time.time()
+        self.log.name = 'Reader'
+        self.log.info('Reader Process starting')
+
+        self.Initialize()
+
+        # add the Histogram Collection Algorithm
+        self.a.addAlgorithm( CollectHistograms(self) )
+
+        self.log.info('Reader Beginning Distribution')
+        sc = self.DoFirstEvent( )
+        if sc.isSuccess() :
+            self.log.info('Reader First Event OK')
+        else :
+            self.log.critical('Reader Failed on First Event')
+            self.stat = FAILURE
+
+        # Do All Others -------------------------------------------------------
+        while True :
+            # Check Termination Criteria
+            if self.nOut == self.evtMax :
+                self.log.info('evtMax( %i ) reached'%(self.evtMax))
+                break
+            # Check Health
+            if not self.stat.isSuccess() :
+                self.log.critical( 'Reader is Damaged!' )
+                break
+            # Continue to read, dump and send event
+            t = time.time()
+            self.a.run(1)
+            self.rTime += (time.time()-t)
+            if not bool(self.evt['/Event']) :
+                self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
+                break
+            self.currentEvent = self.getEventNumber( )
+            tb = self.TS.Dump( )
+            self.evcom.send( (self.currentEvent, tb) )
+            # clean up
+            self.nOut += 1
+            self.eventLoopSyncer.set()
+            self.evt.clearStore( )
+        self.log.info('Setting <Last> Event')
+        self.lastEvent.set()
+
+        # Finalize
+        self.log.info( 'Reader : Event Distribution complete.' )
+        self.evcom.finalize()
+        self.Finalize()
+        self.tTime = time.time() - startEngine
+        self.Report()
+
+# =============================================================================
+
+class Worker( GMPComponent ) :
+    def __init__( self, workerID, queues, events, params ) :
+        GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
+        # Identify the writer streams
+        self.writerDict = self.IdentifyWriters( )
+        # Identify the accept/veto checks for each event
+        self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
+        self.log.debug("Worker-%i Created OK"%(self.nodeID))
+        self.eventOutput = True
+
+    def processConfiguration( self ) :
+        # Worker :
+        #   No input
+        #   No output
+        #   No Histos
+        self.config[ 'EventSelector'  ].Input     = []
+        self.config[ 'ApplicationMgr' ].OutStream = []
+        if "HistogramPersistencySvc" in self.config.keys() :
+            self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
+        formatHead = '[Worker-%i] '%(self.nodeID)
+        self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
+
+        for key, lst in self.writerDict.iteritems() :
+            self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
+
+        for m in self.writerDict[ "tuples" ] :
+            # rename Tuple output file with an appendix
+            # based on worker id, for merging later
+            newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
+            self.config[ m.key ].Output = newName
+
+        # Suppress INFO Output for all but Worker-0
+        if self.nodeID == 0 :
+            pass
+        else                :
+            self.config[ 'MessageSvc' ].OutputLevel = ERROR
+
+    def Engine( self ) :
+        startEngine = time.time()
+        self.log.name = "Worker-%i"%(self.nodeID)
+        self.log.info("Worker %i starting Engine"%(self.nodeID))
+        self.Initialize()
+        self.filerecordsAgent = FileRecordsAgent(self)
+
+        # populate the TESSerializer itemlist
+        self.log.info('EVT WRITERS ON WORKER : %i'\
+                       %( len(self.writerDict['events'])))
+
+        nEventWriters = len( self.writerDict[ "events" ] )
+        if nEventWriters :
+            for m in self.writerDict[ "events" ] :
+                for item in m.ItemList :
+                    self.log.debug(' adding ItemList Item to ts : %s'%(item))
+                    self.ts.addItem( item )
+                for item in m.OptItemList :
+                    self.log.debug(' adding Optional Item to ts : %s'%(item))
+                    self.ts.addOptItem( item )
+        else :
+            self.log.info( 'There is no Event Output for this app' )
+            self.eventOutput = False
+
+        # add the Histogram Collection Algorithm
+        self.a.addAlgorithm( CollectHistograms(self) )
+
+        # Begin processing
+        self.log.name = "Worker-%i"%(self.nodeID)
+        Go = True
+        while Go :
+            packet = self.evcom.receive( )
+            if packet : pass
+            else      : continue
+            if packet == 'FINISHED' : break
+            evtNumber, tbin = packet    # unpack
+            self.nIn += 1
+            self.TS.Load( tbin )
+            # print 'Worker-%i : Event %i'%(self.nodeID, evtNumber)
+            t = time.time()
+            sc = self.a.executeEvent()
+            if self.nIn == 1 :
+                self.firstEvTime = time.time()-t
+            else :
+                self.rTime += (time.time()-t)
+            if sc.isSuccess() :
+                pass
+            else :
+                self.log.warning('Did not Execute Event')
+                self.evt.clearStore()
+                continue
+            if self.isEventPassed() :
+                pass
+            else :
+                self.log.warning( 'Event did not pass : %i'%(evtNumber) )
+                self.evt.clearStore()
+                continue
+            if self.eventOutput :
+                # It may be the case of generating Event Tags; hence
+                #   no event output
+                self.currentEvent = self.getEventNumber( )
+                tb = self.TS.Dump( )
+                self.evcom.send( (self.currentEvent, tb) )
+                self.nOut += 1
+            self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
+            self.eventLoopSyncer.set()
+            self.evt.clearStore( )
+        self.log.info('Setting <Last> Event')
+        self.lastEvent.set()
+
+        self.evcom.finalize()
+        self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
+        # Now send the FileRecords and stop/finalize the appMgr
+        self.filerecordsAgent.SendFileRecords()
+        self.Finalize()
+        self.tTime = time.time()-startEngine
+        self.Report()
+
+    def getCheckAlgs( self ) :
+        '''
+        For some output writers, a check is performed to see if the event has
+        executed certain algorithms.
+        These reside in the AcceptAlgs property for those writers
+        '''
+        acc = []
+        req = []
+        vet = []
+        for m in self.writerDict[ "events" ] :
+            if hasattr(m.w, 'AcceptAlgs')  : acc += m.w.AcceptAlgs
+            if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
+            if hasattr(m.w, 'VetoAlgs')    : vet += m.w.VetoAlgs
+        return (acc, req, vet)
+
+
+    def checkExecutedPassed( self, algName ) :
+        if  self.a.algorithm( algName )._ialg.isExecuted()\
+        and self.a.algorithm( algName )._ialg.filterPassed() :
+            return True
+        else :
+            return False
+
+    def isEventPassed( self ) :
+        '''
+        Check the algorithm status for an event.
+        Depending on output writer settings, the event
+          may be declined based on various criteria.
+        This is a transcript of the check that occurs in GaudiSvc::OutputStream
+        '''
+        passed = False
+
+        self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
+        if self.acceptAlgs :
+            for name in self.acceptAlgs :
+                if self.checkExecutedPassed( name ) :
+                    passed = True
+                    break
+        else :
+            passed = True
+
+        self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
+        for name in self.requireAlgs :
+            if self.checkExecutedPassed( name ) :
+                pass
+            else :
+                self.log.info('Evt declined (requireAlgs) : %s'%(name) )
+                passed = False
+
+        self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
+        for name in self.vetoAlgs :
+            if self.checkExecutedPassed( name ) :
+                pass
+            else :
+                self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
+                passed = False
+        return passed
+
+# =============================================================================
+
+class Writer( GMPComponent ) :
+    def __init__( self, queues, events, params ) :
+        GMPComponent.__init__(self,'Writer', -2, queues, events, params)
+        # Identify the writer streams
+        self.writerDict = self.IdentifyWriters( )
+        # This keeps track of workers as they finish
+        self.status = [False]*self.nWorkers
+        self.log.name = "Writer--2"
+
+    def processConfiguration( self ) :
+        # Writer :
+        #   No input
+        #   No Algs
+        self.config[ 'ApplicationMgr' ].TopAlg = []
+        self.config[ 'EventSelector'  ].Input  = []
+
+        self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
+
+        # Now process the output writers
+        for key, lst in self.writerDict.iteritems() :
+            self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
+
+        # Modify the name of the output file to reflect that it came
+        #  from a parallel processing
+        #
+        # Event Writers
+        for m in self.writerDict[ "events" ] :
+            self.log.debug( 'Processing Event Writer : %s'%(m) )
+            newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
+            self.config[ m.key ].Output = newName
+
+        # Now, if there are no event writers, the FileRecords file
+        #   will fail to open, as it only opens an UPDATE version
+        #   of the existing Event Output File
+        # So, if there are no event writers, edit the string of the
+        #   FileRecord Writer
+
+        # FileRecords Writers
+        for m in self.writerDict[ "records" ] :
+            self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
+            newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
+                                       extra=" OPT='RECREATE'" )
+            self.config[ m.key ].Output = newName
+
+        # same for histos
+        hs = "HistogramPersistencySvc"
+        n = None
+        if hs in self.config.keys() :
+            n = self.config[ hs ].OutputFile
+        if n :
+            newName=self.config[hs].OutputFile.replace('.',\
+                                                       '.p%i.'%(self.nWorkers))
+            self.config[ hs ].OutputFile = newName
+
+    def Engine( self ) :
+        startEngine = time.time()
+        self.Initialize()
+        self.histoAgent = HistoAgent( self )
+        self.filerecordsAgent = FileRecordsAgent( self )
+
+        # Begin processing
+        Go      = True
+        current = -1
+        stopCriteria = self.nWorkers
+        while Go :
+            current = (current+1)%self.nWorkers
+            packet = self.evcoms[current].receive( timeout=0.01 )
+            if packet == None :
+                continue
+            if packet == 'FINISHED' :
+                self.log.info('Writer got FINISHED flag : Worker %i'%(current))
+                self.status[current] = True
+                if all(self.status) :
+                    self.log.info('FINISHED recd from all workers, break loop')
+                    break
+                continue
+            # otherwise, continue as normal
+            self.nIn += 1    # update central count (maybe needed by FSR store)
+            evtNumber, tbin = packet    # unpack
+            self.TS.Load( tbin )
+            t  = time.time()
+            self.a.executeEvent()
+            self.rTime += ( time.time()-t )
+            self.currentEvent = self.getEventNumber( )
+            self.evt.clearStore( )
+            self.eventLoopSyncer.set()
+        self.log.name = "Writer--2"
+        self.log.info('Setting <Last> Event')
+        self.lastEvent.set()
+
+        # finalisation steps
+        [ e.finalize() for e in self.evcoms ]
+        # Now do Histograms
+        sc = self.histoAgent.Receive()
+        sc = self.histoAgent.RebuildHistoStore()
+        if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
+        else              : self.log.warning( 'Histo Store Error in Rebuild' )
+
+        # Now do FileRecords
+        sc = self.filerecordsAgent.Receive()
+        self.filerecordsAgent.Rebuild()
+        self.Finalize()
+        self.rTime = time.time()-startEngine
+        self.Report()
+
+# =============================================================================
+
+
+
+# =============================================================================
+
+class Coord( object ) :
+    def __init__( self, nWorkers, config, log ) :
+
+        self.log = log
+        self.config = config
+        # set up Logging
+        self.log.name = 'GaudiPython-Parallel-Logger'
+        self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
+
+        if nWorkers == -1 :
+            # The user has requested all available cpus in the machine
+            self.nWorkers = cpu_count()
+        else :
+            self.nWorkers = nWorkers
+
+
+        self.qs = self.SetupQueues( )    # a dictionary of queues (for Events)
+        self.hq = JoinableQueue( )       # for Histogram data
+        self.fq = JoinableQueue( )       # for FileRecords data
+
+        # Make a Syncer for Initalise, Run, and Finalise
+        self.sInit = Syncer( self.nWorkers, self.log,
+                             limit=WAIT_INITIALISE,
+                             step=STEP_INITIALISE    )
+        self.sRun  = Syncer( self.nWorkers, self.log,
+                             manyEvents=True,
+                             limit=WAIT_SINGLE_EVENT,
+                             step=STEP_EVENT,
+                             firstEvent=WAIT_FIRST_EVENT )
+        self.sFin  = Syncer( self.nWorkers, self.log,
+                             limit=WAIT_FINALISE,
+                             step=STEP_FINALISE )
+        # and one final one for Histogram Transfer
+        self.histSyncEvent = Event()
+
+        # params are common to al subprocesses
+        params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
+
+        # Declare SubProcesses!
+        self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
+        self.workers = []
+        for i in xrange( self.nWorkers ) :
+            wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
+            self.workers.append( wk )
+        self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
+
+        self.system = []
+        self.system.append(self.writer)
+        [ self.system.append(w) for w in self.workers ]
+        self.system.append(self.reader)
+
+    def getSyncEvents( self, nodeID ) :
+        init = self.sInit.d[nodeID].event
+        run  = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
+        fin  = self.sFin.d[nodeID].event
+        return ( init, run, fin )
+
+    def getQueues( self, nodeID ) :
+        eventQ = self.qs[ nodeID ]
+        histQ  = self.hq
+        fsrQ   = self.fq
+        return ( eventQ, histQ, fsrQ )
+
+    def Go( self ) :
+
+        # Initialise
+        self.log.name = 'GaudiPython-Parallel-Logger'
+        self.log.info( 'INITIALISING SYSTEM' )
+        for p in self.system :
+            p.Start()
+        sc = self.sInit.syncAll(step="Initialise")
+        if sc : pass
+        else  : self.Terminate() ; return FAILURE
+
+        # Run
+        self.log.name = 'GaudiPython-Parallel-Logger'
+        self.log.info( 'RUNNING SYSTEM' )
+        sc = self.sRun.syncAll(step="Run")
+        if sc : pass
+        else  : self.Terminate() ; return FAILURE
+
+        # Finalise
+        self.log.name = 'GaudiPython-Parallel-Logger'
+        self.log.info( 'FINALISING SYSTEM' )
+        sc = self.sFin.syncAll(step="Finalise")
+        if sc : pass
+        else  : self.Terminate() ; return FAILURE
+
+        # if we've got this far, finally report SUCCESS
+        self.log.info( "Cleanly join all Processes" )
+        self.Stop()
+        self.log.info( "Report Total Success to Main.py" )
+        return SUCCESS
+
+    def Terminate( self ) :
+        # Brutally kill sub-processes
+        self.writer.proc.terminate()
+        [ w.proc.terminate() for w in self.workers]
+        self.reader.proc.terminate()
+
+    def Stop( self ) :
+        # procs should be joined in reverse order to launch
+        self.system.reverse()
+        for s in self.system :
+            s.proc.join()
+        return SUCCESS
+
+    def SetupQueues( self ) :
+        # This method will set up the network of Queues needed
+        # N Queues = nWorkers + 1
+        # Each Worker has a Queue in, and a Queue out
+        # Reader has Queue out only
+        # Writer has nWorkers Queues in
+
+        # one queue from Reader-Workers
+        rwk = JoinableQueue()
+        # one queue from each worker to writer
+        workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
+        d = {}
+        d[-1] = (None, rwk)              # Reader
+        d[-2] = (workersWriter, None)    # Writer
+        for i in xrange(self.nWorkers) : d[i] =  (rwk, workersWriter[i])
+        return d
+
+# ============================= EOF ===========================================

python/GaudiMP/IoRegistry.py

+## @file GaudiMP.IoRegistry
+## @purpose hold I/O registration informations
+## @author Sebastien Binet <binet@cern.ch>
+
+from FdsRegistry import FdsDict
+
+class IoRegistry (object):
+    """Singleton class to hold I/O registration and fds information"""
+    instances = dict() # { 'io-comp-name' : {'oldfname':'newfname',...},... }
+    fds_dict = FdsDict()
+    pass # IoRegistry

python/GaudiMP/Parallel.py

+# File: GaudiMP/Parallel.py
+# Author: Pere Mato (pere.mato@cern.ch)
+
+""" GaudiMP.Parallel module.
+    This module provides 'parallel' processing support for GaudiPyhton.
+    It is adding some sugar on top of public domain packages such as
+    the 'multiprocessing' or the 'pp' packages. The interface can be made
+    independent of the underlying implementation package.
+    Two main class are defined: Task and WorkManager
+"""
+
+__all__ = [ 'Task','WorkManager' ]
+excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
+
+import sys, os, time, copy
+import multiprocessing
+
+def _prefunction( f, task, item) :
+    return f((task,item))
+def _ppfunction( args ) :
+    #--- Unpack arguments
+    task, item = args
+    stat = Statistics()
+    #--- Initialize the remote side (at least once)
+    if not task.__class__._initializeDone :
+        for k,v in task.environ.items() :
+            if k not in excluded_varnames : os.environ[k] = v
+        task.initializeRemote()
+        task.__class__._initializeDone = True
+    #--- Reset the task output
+    task._resetOutput()
+    #--- Call processing
+    task.process(item)
+    #--- Collect statistics
+    stat.stop()
+    return (copy.deepcopy(task.output), stat)
+
+class Statistics(object):
+    def __init__(self):
+        self.name  = os.getenv('HOSTNAME')
+        self.start = time.time()
+        self.time  = 0.0
+        self.njob  = 0
+    def stop(self):
+        self.time = time.time() - self.start
+
+class Task(object) :
+    """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
+        User class much inherit from it and implement the methods initializeLocal,
+        initializeRemote, process and finalize.   """
+    _initializeDone = False
+    def __new__ ( cls, *args, **kwargs ):
+        task = object.__new__( cls )
+        task.output = ()
+        task.environ = {}
+        for k,v in os.environ.items(): task.environ[k] = v
+        task.cwd = os.getcwd()
+        return task
+    def initializeLocal(self):
+        pass
+    def initializeRemote(self):
+        pass
+    def process(self, item):
+        pass
+    def finalize(self) :
+        pass
+    def _mergeResults(self, result) :
+        if type(result) is not type(self.output) :
+            raise TypeError("output type is not same as obtained result")
+        #--No iteratable---
+        if not hasattr( result , '__iter__' ):
+            if hasattr(self.output,'Add') : self.output.Add(result)
+            elif hasattr(self.output,'__iadd__') : self.output += result
+            elif hasattr(self.output,'__add__') : self.output = self.output + result
+            else : raise TypeError('result cannot be added')
+        #--Dictionary---
+        elif type(result) is dict :
+            if self.output.keys() <= result.keys(): minkeys = self.output.keys()
+            else: minkeys = result.keys()
+            for key in result.keys() :
+                if key in self.output :
+                    if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
+                    elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
+                    elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
+                    else : raise TypeError('result cannot be added')
+                else :
+                    self.output[key] = result[key]
+        #--Anything else (list)
+        else :
+            for i in range( min( len(self.output) , len(result)) ):
+                if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
+                elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
+                elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
+                else : raise TypeError('result cannot be added')
+    def _resetOutput(self):
+        output =  (type(self.output) is dict) and self.output.values() or self.output
+        for o in output :
+            if hasattr(o, 'Reset'): o.Reset()
+
+
+class WorkManager(object) :
+    """ Class to in charge of managing the tasks and distributing them to
+        the workers. They can be local (using other cores) or remote
+        using other nodes in the local cluster """
+
+    def __init__( self, ncpus='autodetect', ppservers=None) :
+        if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
+        else :                     self.ncpus = ncpus
+        if ppservers :
+            import pp
+            self.ppservers = ppservers
+            self.sessions = [ SshSession(srv) for srv in ppservers ]
+            self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
+            self.mode = 'cluster'
+        else :
+            self.pool = multiprocessing.Pool(self.ncpus)
+            self.mode = 'multicore'
+        self.stats = {}
+
+    def __del__(self):
+        if hasattr(self,'server') : self.server.destroy()
+
+    def process(self, task, items, timeout=90000):
+        if not isinstance(task,Task) :
+            raise TypeError("task argument needs to be an 'Task' instance")
+        # --- Call the Local initialialization
+        task.initializeLocal()
+        # --- Schedule all the jobs ....
+        if self.mode == 'cluster' :
+            jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
+            for job in jobs :
+                result, stat = job()
+                task._mergeResults(result)
+                self._mergeStatistics(stat)
+            self._printStatistics()
+            self.server.print_stats()
+        elif self.mode == 'multicore' :
+            start = time.time()
+            jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
+            for result, stat in  jobs.get(timeout) :
+                task._mergeResults(result)
+                self._mergeStatistics(stat)
+            end = time.time()
+            self._printStatistics()
+            print 'Time elapsed since server creation %f' %(end-start)
+        # --- Call the Local Finalize
+        task.finalize()
+    def _printStatistics(self):
+        njobs = 0
+        for stat in self.stats.values():
+            njobs += stat.njob
+        print 'Job execution statistics:'
+        print 'job count | % of all jobs | job time sum | time per job | job server'
+        for name, stat  in self.stats.items():
+            print '       %d |        %6.2f |     %8.3f |    %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
+
+    def _mergeStatistics(self, stat):
+        if stat.name not in self.stats : self.stats[stat.name] = Statistics()
+        s = self.stats[stat.name]
+        s.time += stat.time
+        s.njob += 1
+
+
+class SshSession(object) :
+    def __init__(self, hostname):
+        import pyssh
+        import pp
+        self.host = hostname
+        ppprefix =  os.path.dirname(os.path.dirname(pp.__file__))
+        self.session = pyssh.Ssh(host=hostname)
+        self.session.open()
+        self.session.read_lazy()
+        self.session.write('cd %s\n' % os.getcwd())
+        self.session.read_lazy()
+        self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
+        self.session.read_lazy()
+        self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
+        self.session.read_lazy()
+        self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
+        self.session.read_lazy()
+        self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
+        self.session.read_lazy()
+        self.session.read_lazy()
+        print 'started ppserver in ', hostname
+    def __del__(self):
+        self.session.close()
+        print 'killed ppserver in ', self.host
+
+# == EOF ====================================================================================

python/GaudiMP/__init__.py

+# File: GaudiMP/__init__.py
+# Author: the gaudi-mp team
+
+"""
+   GaudiMP main module.
+   It makes available a number of APIs and classes to be used by end user scripts
+
+   Usage:
+      import GaudiMP
+"""

python/GaudiMP/pTools.py

+from GaudiPython import gbl, SUCCESS, FAILURE
+from multiprocessing import Event
+import pickle, time
+
+# Eoin Smith
+# 3 Aug 2010
+
+#
+# This script contains the ancillary classes and functions used in the
+#  GaudiPython Parallel model
+#
+# Classes :
+# - HistoAgent : In charge of extracting Histograms from their Transient Store
+#                 on a reader/worker, communicating them to the writer, and on
+#                 the writer receiving them and rebuilding a single store
+#
+# - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
+#
+# - LumiFSR : FSR data from different workers needs to be carefully merged to
+#              replicate the serial version; this class aids in that task by
+#              representing an LHCb LumiFSR object as a python class
+#
+# - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
+#                      differences in serial and parallel output
+#
+# - Syncer : This class is responsible for syncing processes for a specified
+#             section of execution.  For example, one Syncer object might be
+#             syncing Initialisation, one for Running, one for Finalisation.
+#             Syncer uses multiprocessing.Event() objects as flags which are
+#             visible across the N processes sharing them.
+#             IMPORTANT : The Syncer objects in the GaudiPython Parallel model
+#              ensure that there is no hanging; they in effect, allow a timeout
+#              for Initialisation, Run, Finalise on all processes
+#
+# - SyncMini : A class wrapper for a multiprocessing.Event() object
+#
+# Methods :
+# - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
+#                          ( AppMgr().evtsvc() ) to this to get the current
+#                          Event Number as an integer (even from RawEvents!)
+#
+
+
+# used to convert stored histos (in AIDA format) to ROOT format
+aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
+
+# =========================== Classes =========================================
+
+class HistoAgent( ) :
+    def __init__( self, gmpComponent ) :
+        self._gmpc = gmpComponent
+        self.hvt = self._gmpc.hvt
+        self.histos = []
+        self.qin = self._gmpc.hq
+        self.log = self._gmpc.log
+
+        # There are many methods for booking Histogram Objects to Histo store
+        # here they are collected in a dictionary, with key = a relevant name
+        self.bookingDict = {}
+        self.bookingDict['DataObject']        = self.bookDataObject
+        self.bookingDict['NTuple::Directory'] = self.bookDataObject
+        self.bookingDict['NTuple::File']      = self.bookDataObject
+        self.bookingDict['TH1D']       = self.bookTH1D
+        self.bookingDict['TH2D']       = self.bookTH2D
+        self.bookingDict['TH3D']       = self.bookTH3D
+        self.bookingDict['TProfile']   = self.bookTProfile
+        self.bookingDict['TProfile2D'] = self.bookTProfile2D
+
+    def register( self, tup ) :
+        # add a tuple of (worker-id, histoDict) to self.histos
+        assert tup.__class__.__name__ == 'tuple'
+        self.histos.append( tup )
+
+    def Receive( self ) :
+        hstatus = self._gmpc.nWorkers+1    # +1 for the Reader!
+        while True :
+            tup = self.qin.get()
+            if tup == 'HISTOS_SENT' :
+                self.log.debug('received HISTOS_SENT message')
+                hstatus -= 1
+                if not hstatus : break
+            else   :
+              self.register( tup )
+        self._gmpc.sEvent.set()
+        self.log.info('Writer received all histo bundles and set sync event')
+        return SUCCESS
+
+
+    def RebuildHistoStore( self ) :
+        '''
+        Rebuild the Histogram Store from the histos received by Receive()
+        If we have a histo which is not in the store,
+        book and fill it according to self.bookingDict
+        If we have a histo with a matching histo in the store,
+        add the two histos, remembering that aida2root must be used on
+        the Stored histo for compatibility.
+        '''
+        errors = 0
+        for tup in self.histos :
+            workerID, histDict = tup
+            added = 0 ; registered = 0; booked = 0
+            for n in histDict.keys() :
+                o = histDict[ n ]
+                obj = self.hvt.retrieve( n )
+                if obj :
+                    try    :
+                        aida2root(obj).Add(o)
+                    except :
+                        self.log.warning('FAILED TO ADD : %s'%(str(obj)))
+                        errors += 1
+                    added += 1
+                else :
+                    if o.__class__.__name__ in self.bookingDict.keys() :
+                        try    :
+                            self.bookingDict[o.__class__.__name__](n, o)
+                        except :
+                            self.log.warning('FAILED TO REGISTER : %s\tto%s'\
+                                             %(o.__class__.__name__, n))
+                            errors += 1
+                    else :
+                        self.log.warning( 'No booking method for: %s\t%s\t%s'\
+                                          %(n,type(o),o.__class__.__name__) )
+                        errors += 1
+                    booked += 1
+        hs = self.hvt.getHistoNames()
+        self.log.info( 'Histo Store Rebuilt : ' )
+        self.log.info( '  Contains %i objects.'%(len(hs)) )
+        self.log.info( '  Errors in Rebuilding : %i'%(errors) )
+        return SUCCESS
+
+
+    def bookDataObject( self, n, o ):
+        '''
+        Register a DataObject to the Histo Store
+        '''
+        self._gmpc.hvt.registerObject( n, o )
+
+    def bookTH1D( self, n, o ) :
+        '''
+        Register a ROOT 1D THisto to the Histo Store
+        '''
+        obj = self.hvt._ihs.book( n, o.GetTitle(),\
+                                     o.GetXaxis().GetNbins(),\
+                                     o.GetXaxis().GetXmin(),\
+                                     o.GetXaxis().GetXmax() )
+        aida2root(obj).Add(o)
+
+    def bookTH2D( self, n, o ) :
+        '''
+        Register a ROOT 2D THisto to the Histo Store
+        '''
+        obj = self.hvt._ihs.book( n, o.GetTitle(),\
+                                     o.GetXaxis().GetNbins(),\
+                                     o.GetXaxis().GetXmin(),\
+                                     o.GetXaxis().GetXmax(),\
+                                     o.GetYaxis().GetNbins(),\
+                                     o.GetYaxis().GetXmin(),\
+                                     o.GetYaxis().GetXmax() )
+        aida2root(obj).Add(o)
+
+    def bookTH3D( self, n, o ) :
+        '''
+        Register a ROOT 3D THisto to the Histo Store
+        '''
+        obj = self.hvt._ihs.book( n, o.GetTitle(),\
+                                     o.GetXaxis().GetXbins(),\
+                                     o.GetXaxis().GetXmin(),\
+                                     o.GetXaxis().GetXmax(),\
+                                     o.GetYaxis().GetXbins(),\
+                                     o.GetYaxis().GetXmin(),\
+                                     o.GetYaxis().GetXmax(),\
+                                     o.GetZaxis().GetXbins(),\
+                                     o.GetZaxis().GetXmin(),\
+                                     o.GetZaxis().GetXmax() )
+        aida2root(obj).Add(o)
+
+    def bookTProfile( self, n, o ) :
+        '''
+        Register a ROOT TProfile to the Histo Store
+        '''
+        obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
+                                         o.GetXaxis().GetNbins(),\
+                                         o.GetXaxis().GetXmin(),\
+                                         o.GetXaxis().GetXmax(),\
+                                         o.GetOption() )
+        aida2root(obj).Add(o)
+
+    def bookTProfile2D( self, n, o ) :
+        '''
+        Register a ROOT TProfile2D to the Histo Store
+        '''
+        obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
+                                         o.GetXaxis().GetNbins(),\
+                                         o.GetXaxis().GetXmin(),\
+                                         o.GetXaxis().GetXmax(),\
+                                         o.GetYaxis().GetNbins(),\
+                                         o.GetYaxis().GetXmin(),\
+                                         o.GetYaxis().GetXmax()  )
+        aida2root(obj).Add(o)
+
+# =============================================================================
+
+class FileRecordsAgent( ) :
+    def __init__( self, gmpComponent ) :
+        self._gmpc = gmpComponent
+        self.fsr = self._gmpc.fsr
+        self.q   = self._gmpc.fq
+        self.log = self._gmpc.log
+        self.objectsIn  = []   # used for collecting FSR store objects
+        self.objectsOut = []
+
+    def localCmp( self, tupA, tupB ) :
+        # sort tuples by a particular element
+        # for the sort() method
+        ind  = 0
+        valA = tupA[ind]
+        valB = tupB[ind]
+        if   valA<valB : return -1
+        elif valA>valB : return  1
+        else           : return  0
+
+
+    def SendFileRecords( self ) :
+        # send the FileRecords data as part of finalisation
+
+        # Take Care of FileRecords!
+        # There are two main things to consider here
+        # 1) The DataObjects in the FileRecords Transient Store
+        # 2) The fact that they are Keyed Containers, containing other objects
+        #
+        # The Lead Worker, nodeID=0, sends everything in the FSR store, as
+        #   a completeness guarantee,
+        #
+        # send in form ( nodeID, path, object)
+        self.log.info('Sending FileRecords...')
+        lst      = self.fsr.getHistoNames()
+
+        # Check Validity
+        if not lst :
+            self.log.info('No FileRecords Data to send to Writer.')
+            self.q.put( 'END_FSR' )
+            return SUCCESS
+
+        # no need to send the root node
+        if '/FileRecords' in lst : lst.remove('/FileRecords')
+
+        for l in lst :
+            o = self.fsr.retrieveObject( l )
+            if hasattr(o, "configureDirectAccess") :
+                o.configureDirectAccess()
+            # lead worker sends everything, as completeness guarantee
+            if self._gmpc.nodeID == 0 :
+                self.objectsOut.append( (0, l, pickle.dumps(o)) )
+            else :
+                # only add the Event Counter
+                # and non-Empty Keyed Containers (ignore empty ones)
+                if l == '/FileRecords/EventCountFSR' :
+                    tup = (self._gmpc.nodeID, l, pickle.dumps(o))
+                    self.objectsOut.append( tup )