Commits

bi...@4525493e-7705-40b1-a816-d608a930855b  committed a8de5b7

add support for metadata extraction for d3pds + add support for metadata extraction for d3pds + speedup writing d3pds by having a static outputlist + implement clid-less itemlist

  • Participants
  • Parent commits 149cb21

Comments (0)

Files changed (13)

+2012-03-10  Sebastien Binet  <binet@voatlas51.cern.ch>
+
+	* tagging AthenaRootComps-00-10-03
+	* add support for metadata extraction for d3pds
+	* speedup writing d3pds by having a static outputlist
+	* implement clid-less itemlist
+	* A src/RootNtupleOutputStream.cxx
+	* A src/RootNtupleOutputStream.h
+	* M python/AthenaRootBase.py
+	* M python/OutputStreamAthenaRoot.py
+	* M share/tests/test_athena_ntuple_dumper.py
+	* M share/tests/test_athena_variable_shape_ntuple.py
+	* M src/RootConnection.cxx
+	* M src/RootNtupleEventSelector.cxx
+	* M src/RootNtupleEventSelector.h
+	* M src/components/AthenaRootComps_entries.cxx
+	* M test/run_test_athena_ntuple_dumper.sh
+	* M test/run_test_athena_variable_shape_ntuple.sh
+
 2012-03-08  Sebastien Binet  <binet@voatlas51.cern.ch>
 
 	* tagging AthenaRootComps-00-10-02

File python/AthenaRootBase.py

           import AtlasSealCLHEP.OldCLHEPStreamers
      except ImportError:
           msg.info("could not load AtlasSealCLHEP")
-
+          pass
+     
+     if not hasattr (svcMgr, 'MetaDataStore'):
+          svcMgr += CfgMgr.StoreGateSvc ("MetaDataStore")
+          pass
+     
      msg.debug( "Loading basic services for AthenaRoot... [DONE]" )
      return
 

File python/OutputStreamAthenaRoot.py

 from AthenaRootCompsConf import Athena__RootOutputStreamTool as AthenaRootOutputStreamTool
 
 def createNtupleOutputStream(streamName, fileName, tupleName="physics", asAlg = False):
-    theApp.OutStreamType = "AthenaOutputStream"
     if asAlg:
         from AthenaCommon.AlgSequence import AlgSequence
         topSequence = AlgSequence()
         pass
-
+    else:
+        theApp.OutStreamType = "Athena::RootNtupleOutputStream"
+        theApp.OutStreamType = "AthenaOutputStream"
+        pass
+    
     # Now do standard output stream
     writingTool1 = AthenaRootOutputStreamTool( streamName + "Tool" )
     writingTool1.TupleName = tupleName
     #writingTool1.DataHeaderSatellites = [ "basic/:EventInfo#*" ]
     writingTool1.OutputFile = fileName
     #writingTool1.
-    outputStream = AthenaRootOutputStreamProtect(
+    cls = AthenaRootNtupleOutputStreamProtect
+    #cls = CfgMgr.Athena__RootNtupleOutputStream
+    outputStream = cls(
         streamName,
         WritingTool = writingTool1,
-        ItemList = ["unsigned int#RunNumber",
-                    "unsigned int#EventNumber",],
+        ItemList = ["RunNumber",
+                    "EventNumber",],
         #StrictClidMatching = False,
         )
     if asAlg:
     OutputFile = property(_get_output_file, _set_output_file, "fwd doc...")
     pass
 
+from AthenaCommon import CfgMgr
+class AthenaRootNtupleOutputStreamProtect(CfgMgr.Athena__RootNtupleOutputStream):
+    def __init__(self, name='Stream1', **kw):
+        kw['name'] = kw.get('name', name)
+        super(AthenaRootNtupleOutputStreamProtect, self).__init__(**kw)
+        return
+
+    def _set_output_file(self, fname):
+        self._properties['OutputFile'].__set__(self, fname)
+        CfgMgr.Athena__RootNtupleOutputStream("%s_FH" % (self._name,)).OutputFile = fname
+        return
+
+    def _get_output_file(self):
+        return self._properties['OutputFile'].__get__(self)
+
+    OutputFile = property(_get_output_file, _set_output_file, "fwd doc...")
+    pass
+
                                                     

File share/tests/test_athena_ntuple_dumper.py

     import AthenaRootComps.WriteAthenaRoot as arcw
     out = arcw.createNtupleOutputStream("StreamD3PD", "d3pd.root", "egamma")
     out.ItemList += [
-        "int#el_n",
-        "std::vector<float>#el_eta",
-        "std::vector<std::vector<float> >#el_jetcone_dr",
+        "el_n",
+        "el_eta",
+        "el_jetcone_dr",
         ]
     out.ForceRead = True

File share/tests/test_athena_variable_shape_ntuple.py

     out = arcw.createNtupleOutputStream("StreamD3PD", "d3pd.root", "egamma")
     if not 'OUTBRANCHES' in dir():
         OUTBRANCHES=[
-            "int#el_n",
-            "std::vector<float>#el_eta",
-            "std::vector<std::vector<float> >#el_jetcone_dr",
+            "el_n",
+            "el_eta",
+            "el_jetcone_dr",
             ]
     out.ItemList += OUTBRANCHES
     out.ForceRead = True

File src/RootConnection.cxx

 StatusCode
 RootConnection::commit()
 {
-  if (m_file == 0 || m_tree == 0) {
+  if (m_file == 0 || m_tree == 0 || m_branch == 0) {
     return StatusCode::FAILURE;
   }
 
   if (m_file == 0 || m_tree == 0) {
     return StatusCode::FAILURE;
   }
+
   const std::string& name = container;
   m_branch = m_tree->GetBranch(name.c_str());
   if (m_branch == 0 && m_file->IsWritable()) {

File src/RootNtupleEventSelector.cxx

 #include <stdint.h>
 
 // ROOT includes
+#include "TROOT.h"
 #include "TBranchElement.h"
 #include "TClass.h"
 #include "TClassEdit.h"
 #include "TFile.h"
+#include "TKey.h"
 #include "TLeaf.h"
-#include "TROOT.h"
 #include "TTree.h"
 
 // Framework includes
 RootNtupleEventSelector::RootNtupleEventSelector( const std::string& name,
                                                   ISvcLocator* svcLoc ) :
   AthService ( name,    svcLoc ),
-  m_storeGate( "StoreGateSvc", name ),
+  m_dataStore( "StoreGateSvc/StoreGateSvc", name ),
+  m_metaStore( "StoreGateSvc/MetaDataStore", name ),
   m_clidsvc  ( "ClassIDSvc",   name ),
   m_dictsvc  ( "AthDictLoaderSvc", name ),
   m_nbrEvts  ( 0 ),
   m_tuple    (NULL),
   m_rootAddresses ()
 {
+  declareProperty( "DataStore",
+		   m_dataStore,
+		   "Store where to publish data");
+
+  declareProperty( "MetaStore",
+		   m_metaStore,
+		   "Store where to publish metadata");
+
   declareProperty( "InputCollections", 
                    m_inputCollectionsName,
                    "List of input (ROOT) file names" );
   // as our branches (which need a valid m_ntuple pointer)
   // may be asked to be registered as we are a ProxyProvider.
   // retrieving the event store will poke the ProxyProviderSvc...
-  if ( !m_storeGate.retrieve().isSuccess() ) {
+  if ( !m_dataStore.retrieve().isSuccess() ) {
     ATH_MSG_ERROR
-      ("Could not retrieve [" << m_storeGate.typeAndName() << "] !!");
+      ("Could not retrieve [" << m_dataStore.typeAndName() << "] !!");
+    return StatusCode::FAILURE;
+  }
+
+  // ditto for meta data store
+  if (!m_metaStore.retrieve().isSuccess()) {
+    ATH_MSG_ERROR
+      ("Could not retrieve [" << m_dataStore.typeAndName() << "] !!");
     return StatusCode::FAILURE;
   }
 
 StatusCode RootNtupleEventSelector::finalize()
 {
   ATH_MSG_INFO ("Finalize...");
-
-#if 0
-  if (msgLvl(MSG::DEBUG)) {
-    TObjArray *leaves = m_tuple->GetListOfLeaves();
-    if (leaves) {
-      // loop over leaves
-      for (Int_t i = 0; i < leaves->GetEntries(); ++i) {
-        TLeaf *leaf = (TLeaf *)leaves->At(i);
-        TBranch *branch = leaf->GetBranch();
-        if (branch) {
-          const char *brname = branch->GetName();
-          if (m_tuple->GetBranchStatus(brname)) {
-            ATH_MSG_DEBUG("branch [" << brname << "] was active.");
-          } else {
-            ATH_MSG_DEBUG("branch [" << brname << "] was NOT active.");
-          }
-        }
-      }
-    }
-  }
-
-  // Cleaning-up: make the tuple disappear from the tuple service's registery
-  if ( !m_tupleSvc->deReg( m_tuple ).isSuccess() ) {
-    ATH_MSG_WARNING 
-      ("Could not unregister our (chain of) tuple(s) !" << endreq
-       << "Tuple service may barf.");
-  }
-#endif
-
   // FIXME: this should be tweaked/updated if/when a selection function
   //        or filtering predicate is applied (one day?)
   ATH_MSG_INFO ("Total events read: " << (m_nbrEvts - m_skipEvts));
       }
 
       const bool forceRemove = false;
-      m_storeGate->clearStore(forceRemove).ignore();
+      m_dataStore->clearStore(forceRemove).ignore();
 
     } else {
       // end of collections
     // the ntuple if these branches exist...
     const uint32_t* runnbr = NULL;
     const uint32_t* evtnbr = NULL;
-    if (m_storeGate->contains<uint32_t>("RunNumber")) {
-      m_storeGate->retrieve(runnbr, "RunNumber").ignore();
+    if (m_dataStore->contains<uint32_t>("RunNumber")) {
+      m_dataStore->retrieve(runnbr, "RunNumber").ignore();
     }
-    if (m_storeGate->contains<uint32_t>("EventNumber")) {
-      m_storeGate->retrieve(evtnbr, "EventNumber").ignore();
+    if (m_dataStore->contains<uint32_t>("EventNumber")) {
+      m_dataStore->retrieve(evtnbr, "EventNumber").ignore();
     }
     // event info
     EventType* evtType = new EventType;
                                                      evtnbr ? 
                                                      *evtnbr : m_curEvt-1, 0 ),
                                         evtType );
-    if ( !m_storeGate->record( evtInfo, "TTreeEventInfo" ).isSuccess() ) {
+    if ( !m_dataStore->record( evtInfo, "TTreeEventInfo" ).isSuccess() ) {
       ATH_MSG_ERROR ("Could not record TTreeEventInfo !");
       delete evtInfo; evtInfo = 0;
       return StatusCode::FAILURE;
   // remove our EventInfo if any...
   // {
   //   const bool force_remove = true;
-  //   if (!m_storeGate->clearStore(force_remove).isSuccess()) {
+  //   if (!m_dataStore->clearStore(force_remove).isSuccess()) {
   //     ATH_MSG_ERROR("could not clear event store!");
   //     return StatusCode::FAILURE;
   //   } else {
-  //     ATH_MSG_INFO("sgdump: \n" << m_storeGate->dump()); 
+  //     ATH_MSG_INFO("sgdump: \n" << m_dataStore->dump()); 
   //   }
   // }
 
   return StatusCode::SUCCESS;
 }
 
+StatusCode 
+RootNtupleEventSelector::createMetaDataRootBranchAddresses(StoreGateSvc *store,
+							   TTree *tree,
+							   const std::string& prefix) const
+{
+  if (0 == store) {
+    ATH_MSG_ERROR("null pointer to store !");
+    return StatusCode::FAILURE;
+  }
+
+  if (0 == tree) {
+    ATH_MSG_ERROR("null pointer to n-tuple !");
+    return StatusCode::FAILURE;
+  }
+
+  const std::string tree_name = tree->GetName();
+  TObjArray *branches = tree->GetListOfBranches();
+  if (!branches) {
+    ATH_MSG_INFO("no branches!!");
+    return StatusCode::SUCCESS;
+  }
+
+  // loop over branches
+  for (Int_t i = 0; i < branches->GetEntries(); ++i) {
+    TBranch *branch = (TBranch *)branches->At(i);
+    if (branch) {
+      
+      CLID id = 0;
+      const void* value_ptr = tree;
+      const std::string type_name = branch->GetClassName();
+      const std::string br_name = branch->GetName();
+      const std::string sg_key = prefix.empty() 
+	? br_name
+	: prefix + "/" + br_name;
+      TClass *cls = NULL;
+      if (!type_name.empty()) {
+	cls = TClass::GetClass(type_name.c_str());
+      }
+      const std::type_info *ti = 0;
+
+      if (cls) {
+        ti = cls->GetTypeInfo();
+        // first, try to load a dict for that class...
+        if (ti) {
+          m_dictsvc->load_type(*ti);
+        }
+        if (!ti) {
+          ATH_MSG_WARNING("could not find a type-info for [" << 
+                          type_name << "]");
+          continue;
+        }
+        std::string ti_typename = System::typeinfoName(*ti);
+        if (!m_clidsvc->getIDOfTypeInfoName(ti_typename, id)
+            .isSuccess()) {
+          // try another one...
+          ti_typename = TClassEdit::ShortType(ti_typename.c_str(),
+                                              TClassEdit::kDropAllDefault);
+          if (!m_clidsvc->getIDOfTypeInfoName(ti_typename, id)
+              .isSuccess()) {
+            ATH_MSG_INFO("** could not find a CLID from type-info ["
+                         << System::typeinfoName(*ti) << "]");
+            ATH_MSG_INFO("** could not find a CLID from type-info-alias ["
+                         << ti_typename << "]");
+            continue;
+          }
+        }
+      } else {
+        // probably a built-in type...
+	TObjArray *leaves = branch->GetListOfLeaves();
+	if (leaves && 
+	    leaves->GetEntries() == 1) {
+	  const std::string type_name = ((TLeaf*)leaves->At(0))->GetTypeName();
+	  if (!m_clidsvc->getIDOfTypeName(::root_typename(type_name), id)
+	      .isSuccess()) {
+	    ATH_MSG_INFO("** could not find a CLID for type-name [" 
+			 << type_name << "]");
+	    continue;
+	  }
+	}
+      }
+      if (id == 0) {
+        ATH_MSG_INFO("** could not find a CLID for type-name ["
+                     << type_name << "]");
+        continue;
+      }
+      Athena::RootBranchAddress* addr = new Athena::RootBranchAddress
+        (ROOT_StorageType, id, 
+         tree_name, 
+         br_name, 
+         (unsigned long)(value_ptr),
+         (unsigned long)(0));
+      if (!store->recordAddress(sg_key, addr, true).isSuccess()) {
+	ATH_MSG_ERROR("could not record address at [" << sg_key << "] in store ["
+		      << store->name() << "]");
+	delete addr; addr = 0;
+      }
+      // SG::TransientAddress* taddr = new SG::TransientAddress
+      //   (id, sg_key, addr);
+      // taddr->setProvider(this);
+      // taddr->clearAddress(true);
+      // tads.push_back(taddr);
+    }
+  }
+  return StatusCode::SUCCESS;
+}
+
 TTree*
 RootNtupleEventSelector::fetchNtuple(const std::string& fname) const
 {
   // disable all branches
   tree->SetBranchStatus("*", 0);
 
+  if (!m_metaStore->clearStore().isSuccess()) {
+    ATH_MSG_INFO("could not clear store [" << m_metaStore.typeAndName() << "]");
+    return tree;
+  }
+
+  // metadata if any
+  TDirectoryFile *metadir = (TDirectoryFile*)f->Get((m_tupleName.value()+"Meta").c_str());
+  if (metadir) {
+    const TList *keys = metadir->GetListOfKeys();
+    for (Int_t i=0; i < keys->GetSize(); ++i) {
+      TKey* key = dynamic_cast<TKey*>(keys->At(i));
+      if (!key) {
+	continue;
+      }
+      TTree *metatree = dynamic_cast<TTree*>
+	(metadir->Get(TString(key->GetName()) +
+		      ";" +
+		      TString::Format("%hi", key->GetCycle())));
+      if (!metatree) {
+	continue;
+      }
+      const std::string metatree_name = metatree->GetName();
+      // load data...
+      if (metatree->GetEntry(0) < 0) {
+	ATH_MSG_INFO
+	  ("Problem retrieving data from metadata-tree [" << metatree_name
+	   << "] !!");
+	continue;
+      }
+      //
+      if (!createMetaDataRootBranchAddresses(&*m_metaStore, 
+					     metatree, 
+					     metatree_name).isSuccess()) {
+	ATH_MSG_INFO("could not create metadata for tree ["
+		     << metatree_name << "]");
+	continue;
+      }
+    }
+  }
   return tree;
 }
 

File src/RootNtupleEventSelector.h

   StatusCode createRootBranchAddresses(StoreID::type storeID,
                                        tadList &tads);
 
+  /// helper method to create proxies for the metadata store
+  StatusCode createMetaDataRootBranchAddresses(StoreGateSvc *store,
+   					       TTree *tree,
+   					       const std::string& prefix) const;
+
   /// helper method to retrieve the correct tuple
   TTree* fetchNtuple(const std::string& fname) const;
 
 
   typedef ServiceHandle<StoreGateSvc> StoreGateSvc_t;
   /// Pointer to the @c StoreGateSvc event store
-  StoreGateSvc_t m_storeGate;
+  StoreGateSvc_t m_dataStore;
+
+  /// Pointer to the @c StoreGateSvc metadata store
+  StoreGateSvc_t m_metaStore;
 
   typedef ServiceHandle<IClassIDSvc> ICLIDSvc_t;
   /// Pointer to the @c IClassIDSvc
   typedef SG::unordered_set<SG::TransientAddress*> Addrs_t;
   // the list of transient addresses we "manage" or know about
   // these addresses are the actual TTree's branch names
+  // for the event data
   Addrs_t m_rootAddresses;
+
+  // the list of transient addresses we "manage" or know about
+  // these addresses are the actual TTree's branch names
+  // for the metadata tree(s)
+  //Addrs_t m_rootAddressesMetaData;
 }; 
 
 /////////////////////////////////////////////////////////////////// 

File src/RootNtupleOutputStream.cxx

+#include <cassert>
+#include <string>
+#include <vector>
+#include <fnmatch.h>
+
+// Framework include files
+#include "GaudiKernel/GaudiException.h"
+#include "GaudiKernel/Tokenizer.h"
+#include "GaudiKernel/AlgFactory.h"
+#include "GaudiKernel/IAlgManager.h"
+#include "GaudiKernel/ISvcLocator.h"
+#include "GaudiKernel/IOpaqueAddress.h"
+#include "GaudiKernel/IProperty.h"
+#include "GaudiKernel/ClassID.h"
+
+#include "AthenaKernel/IClassIDSvc.h"
+#include "AthenaKernel/IAthenaOutputTool.h"
+#include "AthenaKernel/IAthenaOutputStreamTool.h"
+
+#include "StoreGate/StoreGateSvc.h"
+#include "SGTools/DataProxy.h"
+#include "SGTools/ProxyMap.h"
+#include "SGTools/SGIFolder.h"
+
+#include "RootNtupleOutputStream.h"
+
+namespace Athena {
+
+// Standard Constructor
+RootNtupleOutputStream::RootNtupleOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
+  : FilteredAlgorithm(name, pSvcLocator),
+    m_dataStore("StoreGateSvc", name),
+    m_pCLIDSvc("ClassIDSvc", name),
+    // m_p2BWritten(string("SG::Folder/") + name + string("_TopFolder"), this),
+    // m_decoder(string("SG::Folder/") + name + string("_excluded"), this),
+    m_streamer(std::string("AthenaOutputStreamTool/") + 
+	       name + std::string("Tool"), this),
+   m_helperTools(this) 
+{
+  assert(pSvcLocator);
+  declareProperty("ItemList",               m_itemList);
+  declareProperty("OutputFile",             m_outputName="DidNotNameOutput.root");
+  declareProperty("EvtConversionSvc",       m_persName="EventPersistencySvc");
+  declareProperty("WritingTool",            m_streamer);
+  declareProperty("Store",                  m_dataStore);
+  declareProperty("ProcessingTag",          m_processTag=name);
+  declareProperty("ForceRead",              m_forceRead=false);
+  declareProperty("PersToPers",             m_persToPers=false);
+  declareProperty("ExemptPersToPers",       m_exemptPersToPers);
+  declareProperty("ProvideDef",             m_provideDef=false);
+  declareProperty("WriteOnExecute",         m_writeOnExecute=true);
+  declareProperty("WriteOnFinalize",        m_writeOnFinalize=false);
+  declareProperty("TakeItemsFromInput",     m_itemListFromTool=false);
+  // declareProperty("CheckNumberOfWrites",    m_checkNumberOfWrites=true);
+  declareProperty("CommitOnStop",           m_commitOnStop=false);
+  //declareProperty("ExcludeList",            m_excludeList);
+  declareProperty("HelperTools",            m_helperTools);
+
+  declareProperty("DynamicItemList",
+		  m_dynamicItemList = false,
+		  "dynamic output itemlist:\n" \
+		  "  if enabled rediscover object list to be written out at each event\n" \
+		  "  otherwise: reuse the one from the first event.");
+}
+
+// Standard Destructor
+RootNtupleOutputStream::~RootNtupleOutputStream() 
+{}
+
+// initialize data writer
+StatusCode 
+RootNtupleOutputStream::initialize() 
+{
+  ATH_MSG_DEBUG("In initialize");
+  if (!this->FilteredAlgorithm::initialize().isSuccess()) {
+    ATH_MSG_ERROR("could not initialize base class");
+    return StatusCode::FAILURE;
+  }
+
+  // Reset the number of events written
+  m_events = 0;
+
+  // set up the SG service:
+  if (!m_dataStore.retrieve().isSuccess()) {
+    ATH_MSG_FATAL("Could not locate default store");
+    return StatusCode::FAILURE;
+  } else {
+    ATH_MSG_DEBUG("Found " << m_dataStore.type() << " store.");
+  }
+  assert(static_cast<bool>(m_dataStore));
+
+  // set up the CLID service:
+  if (!m_pCLIDSvc.retrieve().isSuccess()) {
+    ATH_MSG_FATAL("Could not locate default ClassIDSvc");
+    return StatusCode::FAILURE;
+  }
+  assert(static_cast<bool>(m_pCLIDSvc));
+
+  // Get Output Stream tool for writing
+  if (!m_streamer.retrieve().isSuccess()) {
+    ATH_MSG_FATAL("Can not find " << m_streamer);
+    return StatusCode::FAILURE;
+  }
+   
+  const bool extendProvenanceRecord = true;
+  if (!m_streamer->connectServices(m_dataStore.type(), 
+				   m_persName, 
+				   extendProvenanceRecord).isSuccess()) {
+    ATH_MSG_FATAL("Unable to connect services");
+    return StatusCode::FAILURE;
+  }
+
+  if (!m_helperTools.retrieve().isSuccess()) {
+    ATH_MSG_FATAL("Can not find " << m_helperTools);
+    return StatusCode::FAILURE;
+  }
+  ATH_MSG_INFO("Found " << m_helperTools << endreq << "Data output: " << m_outputName);
+
+  bool allgood = true;
+  for (std::vector<ToolHandle<IAthenaOutputTool> >::const_iterator 
+	 iter = m_helperTools.begin(),
+	 iend = m_helperTools.end();
+       iter != iend; 
+       ++iter) {
+    if (!(*iter)->postInitialize().isSuccess()) {
+      allgood = false;
+    }
+  }
+  if (!allgood) {
+    ATH_MSG_ERROR("problem in postInitialize of a helper tool");
+    return StatusCode::FAILURE;
+  }
+
+  // For 'write on finalize', we set up listener for 'LastInputFile'
+  // and perform write at this point. This happens at 'stop' of the
+  // event selector. RDS 04/2010
+  if (m_writeOnFinalize) {
+    // Set to be listener for end of event
+    ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", this->name());
+    if (!incSvc.retrieve().isSuccess()) {
+      ATH_MSG_ERROR("Cannot get the IncidentSvc");
+      return StatusCode::FAILURE;
+    } else {
+      ATH_MSG_DEBUG("Retrieved IncidentSvc");
+    }
+    incSvc->addListener(this, "MetaDataStop", 50);
+    ATH_MSG_DEBUG("Added MetaDataStop listener");
+  }
+  ATH_MSG_DEBUG("End initialize");
+  return StatusCode::SUCCESS;
+}
+
+void 
+RootNtupleOutputStream::handle(const Incident& inc) 
+{
+   ATH_MSG_DEBUG("handle() incident type: " << inc.type());
+   static const std::string s_METADATASTOP = "MetaDataStop";
+   if (inc.type() == s_METADATASTOP) {
+     // Moved preFinalize of helper tools to stop - want to optimize the
+     // output file in finalize RDS 12/2009
+     for (std::vector<ToolHandle<IAthenaOutputTool> >::const_iterator 
+	    iter = m_helperTools.begin(),
+	    iend = m_helperTools.end();
+	  iter != iend; 
+	  ++iter) {
+       if (!(*iter)->preFinalize().isSuccess()) {
+	 ATH_MSG_ERROR("Cannot finalize helper tool");
+       }
+     }
+     // Always force a final commit in stop - mainly applies to AthenaPool 
+     if (m_writeOnFinalize) {
+       if (!write().isSuccess()) {  // true mean write AND commit
+	 ATH_MSG_ERROR("Cannot write on finalize");
+       }
+     } else if (m_commitOnStop) {
+       // If we don't write on finalize, do final commit if requested
+       if (m_streamer->connectOutput(m_outputName).isFailure()) {
+	 ATH_MSG_ERROR("Cannot connect to output file: " << m_outputName);
+       } else if (m_streamer->commitOutput().isFailure()) {
+	 ATH_MSG_ERROR("commit failed!");
+       } else {
+	 ATH_MSG_INFO("Did final commit");
+       }
+     }
+   }
+   ATH_MSG_INFO("Records written: " << m_events);
+   ATH_MSG_DEBUG("Leaving handle");
+}
+
+// terminate data writer
+StatusCode 
+RootNtupleOutputStream::finalize() 
+{
+  bool failed = false;
+  ATH_MSG_DEBUG("finalize: Optimize output");
+  // Connect the output file to the service
+  if (!m_streamer->finalizeOutput().isSuccess()) {
+    failed = true;
+  }
+  ATH_MSG_DEBUG("finalize: end optimize output");
+  if (!m_helperTools.release().isSuccess()) {
+    failed = true;
+  }
+  if (!m_streamer.release().isSuccess()) {
+    failed = true;
+  }
+  return failed 
+    ? StatusCode::FAILURE 
+    : StatusCode::SUCCESS;
+}
+
+StatusCode
+RootNtupleOutputStream::execute() 
+{
+  bool failed = false;
+  for (std::vector<ToolHandle<IAthenaOutputTool> >::const_iterator 
+	 iter = m_helperTools.begin(),
+	 iend = m_helperTools.end();
+       iter != iend; 
+       ++iter) {
+    if (!(*iter)->preExecute().isSuccess()) {
+      failed = true;
+    }
+  }
+  if (m_writeOnExecute) {
+    if (!write().isSuccess()) {
+      failed = true;
+    }
+  }
+  for (std::vector<ToolHandle<IAthenaOutputTool> >::const_iterator 
+	 iter = m_helperTools.begin(),
+	 iend = m_helperTools.end();
+       iter != iend; 
+       ++iter) {
+    if(!(*iter)->postExecute().isSuccess()) {
+      failed = true;
+    }
+  }
+
+  return failed 
+    ? StatusCode::FAILURE 
+    : StatusCode::SUCCESS;
+}
+
+// Work entry point
+StatusCode
+RootNtupleOutputStream::write() 
+{
+  bool failed = false;
+  // Clear any previously existing item list
+  clearSelection();
+  // Test whether this event should be output
+  if (isEventAccepted()) {
+    // Connect the output file to the service
+    if (m_streamer->connectOutput(m_outputName).isSuccess()) {
+      // First check if there are any new items in the list
+      collectAllObjects();
+      StatusCode sc = m_streamer->streamObjects(m_objects);
+      // Do final check of streaming
+      if (!sc.isSuccess()) {
+	if (!sc.isRecoverable()) {
+	  ATH_MSG_FATAL("streamObjects failed.");
+	  failed = true;
+	} else {
+	  ATH_MSG_DEBUG("streamObjects failed.");
+	}
+      }
+      sc = m_streamer->commitOutput();
+      if (!sc.isSuccess()) {
+	ATH_MSG_FATAL("commitOutput failed.");
+	failed = true;
+      }
+      clearSelection();
+      if (!failed) {
+	m_events++;
+      }
+    }
+  }
+
+  return failed
+    ? StatusCode::FAILURE
+    : StatusCode::SUCCESS;
+}
+
+// Clear collected object list
+inline
+void 
+RootNtupleOutputStream::clearSelection()
+{
+  m_objects.resize(0);
+  if (m_dynamicItemList) {
+    m_selection.resize(0);
+  }
+}
+
+void
+RootNtupleOutputStream::collectAllObjects() 
+{
+  typedef std::vector<std::pair<CLID, std::string> > Items_t;
+
+  if (!m_dynamicItemList && !m_selection.empty()) {
+    // reuse object list from previous event.
+  } else {
+
+    // if (m_itemListFromTool) {
+    //   //FIXME:
+    //   //   if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
+    //   //     ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
+    //   //   }
+    // }
+
+    static const std::string s_plus = "+";
+    static const std::string s_dash = "-";
+    typedef std::vector<const SG::DataProxy*> Proxies_t;
+    Proxies_t proxies = m_dataStore->proxies();
+
+    const std::vector<std::string>& items = m_itemList.value();
+    std::vector<std::string> toremove; 
+    toremove.reserve(items.size());
+
+    Items_t selection; 
+    selection.reserve(items.size());
+
+    for (Proxies_t::const_iterator
+	   iproxy = proxies.begin(),
+	   iend = proxies.end();
+	 iproxy != iend;
+	 ++iproxy) {
+      const SG::DataProxy *proxy = *iproxy;
+      if (!proxy) {
+	continue;
+      }
+      for (std::vector<std::string>::const_iterator
+	     jkey = items.begin(),
+	     jend = items.end();
+	   jkey != jend;
+	   ++jkey) {
+	if (!jkey->empty()) {
+	  if ((*jkey)[0] == s_dash[0]) {
+	    toremove.push_back(jkey->substr(1, std::string::npos));
+	    continue;
+	  }
+	  std::string key = *jkey;
+	  if ((*jkey)[0] == s_plus[0]) {
+	    key = jkey->substr(1, std::string::npos);
+	  }
+	  int o = fnmatch(key.c_str(), 
+			  proxy->name().c_str(),
+			  FNM_PATHNAME);
+	  if (o == 0) {
+	    // accept
+	    selection.push_back(std::make_pair(proxy->clID(), proxy->name()));
+	    break;
+	  }
+	}
+      }
+    }
+
+    m_selection.reserve(selection.size());
+    if (toremove.empty()) {
+      m_selection = selection;
+    } else {
+      for(Items_t::const_iterator 
+	    isel=selection.begin(),
+	    iend=selection.end();
+	  isel != iend;
+	  ++isel) {
+	const std::string &name = isel->second;
+	bool keep = true;
+	for (std::vector<std::string>::const_iterator
+	       jkey = toremove.begin(),
+	       jend = toremove.end();
+	     jkey != jend;
+	     ++jkey) {
+	  const std::string& key = *jkey;
+	  int o = fnmatch(key.c_str(), 
+			  name.c_str(),
+			  FNM_PATHNAME);
+	  if (o == 0) {
+	    // reject
+	    keep = false;
+	    break;
+	  }
+	}
+	if (keep) {
+	  m_selection.push_back(*isel);
+	}
+      }
+    }
+  }
+
+  for (Items_t::const_iterator
+	 itr = m_selection.begin(),
+	 iend = m_selection.end();
+       itr != iend;
+       ++itr) {
+    const std::string &name = itr->second;
+    SG::DataProxy *proxy = m_dataStore->proxy(itr->first, name);
+    if (NULL == proxy) {
+      continue;
+    }
+    if (m_forceRead && proxy->isValid()) {
+      if (!m_persToPers) {
+	if (NULL == proxy->accessData()) {
+	  ATH_MSG_ERROR(" Could not get data object for id "
+			<< proxy->clID() << ",\"" << proxy->name());
+	}
+      } else if (true /*m_exemptPersToPers.find(item.id()) != m_exemptPersToPers.end()*/) {
+	if (NULL == proxy->accessData()) {
+	  ATH_MSG_ERROR(" Could not get data object for id "
+			<< proxy->clID() << ",\"" << proxy->name());
+	}
+      }
+    }
+    DataObject *obj = proxy->object();
+    if (NULL != obj) {
+      m_objects.push_back(obj);
+      ATH_MSG_DEBUG(" Added object " << proxy->clID() << ",\"" << proxy->name() << "\"");
+	  // if (m_checkNumberOfWrites && !m_provideDef) {
+	  //   std::string tn;
+	  //   if (!m_pCLIDSvc->getTypeNameOfID(item.id(), tn).isSuccess()) {
+	  //     ATH_MSG_ERROR(" Could not get type name for id "
+	  // 		    << item.id() << ",\"" << itemProxy->name());
+	  //   } else {
+	  //     tn += '_' + itemProxy->name();
+	  //     CounterMapType::iterator cit = m_objectWriteCounter.find(tn);
+	  //     if (cit == m_objectWriteCounter.end()) {
+	  // 	// First time through
+	  // 	//std::pair<CounterMapType::iterator, bool> result =
+	  // 	m_objectWriteCounter.insert(CounterMapType::value_type(tn, 1));
+	  //     } else {
+	  // 	// set to next iteration (to avoid double counting)
+	  // 	// StreamTools will eliminate duplicates.
+	  // 	(*cit).second = m_events + 1;
+	  //     }
+	  //   }
+	  // }
+	// } else if (!m_forceRead && m_persToPers && proxy->isValid()) {
+	//   tAddr = itemProxy->transientAddress();
+    } //if data object there
+  }
+
+  return;
+}
+
+} //> ns Athena
+

File src/RootNtupleOutputStream.h

+// Dear emacs, this is -*- C++ -*-
+#ifndef ATHENAROOTCOMPS_ROOTNTUPLEOUTPUTSTREAM_H
+#define ATHENAROOTCOMPS_ROOTNTUPLEOUTPUTSTREAM_H
+
+// STL include files
+#include <memory>
+#include <map>
+#include <set>
+#include <vector>
+#include <string>
+
+// fwk includes
+#include "GaudiKernel/IDataSelector.h"
+#include "GaudiKernel/ClassID.h"
+#include "GaudiKernel/ServiceHandle.h"
+#include "GaudiKernel/ToolHandle.h"
+#include "GaudiKernel/IIncidentListener.h"
+#include "AthenaBaseComps/FilteredAlgorithm.h"
+
+// forward declarations
+template <class ConcreteAlgorithm> class AlgFactory;
+class IClassIDSvc;
+class StoreGateSvc;
+class IAthenaOutputStreamTool;
+class IAthenaOutputTool;
+
+namespace SG {
+   class DataProxy;
+   class IFolder;
+   class FolderItem;
+}
+
+namespace Athena {
+
+/** @class Athena::RootNtupleOutputStream
+   * @brief algorithm that marks for write data objects in SG
+   * 
+   * @author binet@cern.ch
+   * $Id$
+   */
+class RootNtupleOutputStream 
+  : virtual public IIncidentListener,
+            public FilteredAlgorithm
+{
+  friend class AlgFactory<Athena::RootNtupleOutputStream>;
+
+public:
+  typedef std::vector<SG::DataProxy*>     Items;
+
+protected:
+  /// handle to the @c StoreGateSvc store where the data we want to
+  /// write out resides
+  ServiceHandle<StoreGateSvc> m_dataStore;
+
+  /// Name of the persistency service capable to write data from the store
+  std::string              m_persName;
+  /// Name of the OutputStreamTool used for writing
+  StringProperty           m_writingTool;
+  /// Name of the output file
+  std::string              m_outputName;
+  /// tag of processing stage:
+  StringProperty           m_processTag;
+   
+  typedef ServiceHandle<IClassIDSvc> IClassIDSvc_t;
+  IClassIDSvc_t m_pCLIDSvc;
+   
+  /// Vector of item names
+  StringArrayProperty      m_itemList;
+  /// Collection of objects beeing selected
+  IDataSelector            m_objects;
+  /// Number of events written to this output stream
+  int                      m_events;
+  /// set to true to force read of data objects in item list
+  bool m_forceRead;
+  /// set to true to allow data objects being copied persistent to persistent (without SG retrieve).
+  bool m_persToPers;
+  std::vector<unsigned int> m_exemptPersToPers;
+  /// set to true to allow defaults being provided for non-existent data objects.
+  bool m_provideDef;
+  /// set to true to trigger streaming of data on execute()
+  bool m_writeOnExecute;
+  /// set to true to trigger streaming of data on finalize()
+  bool m_writeOnFinalize;
+  /// set to write out everything from input DataHeader
+  bool m_itemListFromTool;
+  // /// set to true to check for number of times each object is written
+  // bool m_checkNumberOfWrites;
+  /// set to true to assure an extra commit on stop 
+  bool m_commitOnStop;
+  // /// map to record number of writes per object
+  // typedef std::map<std::string, unsigned int>  CounterMapType;
+  // CounterMapType  m_objectWriteCounter;
+
+  /// dynamic output itemlist: 
+  ///   if enabled rediscover object list to be written out at each event
+  ///   otherwise: reuse the one from the first event.
+  bool m_dynamicItemList;
+
+  /// list of selected proxies.
+  std::vector<std::pair<CLID, std::string> > m_selection;
+
+  /// pointer to AthenaOutputStreamTool
+  ToolHandle<IAthenaOutputStreamTool> m_streamer;
+  /// vector of AlgTools that that are executed by this stream
+  ToolHandleArray<IAthenaOutputTool> m_helperTools;
+
+protected:
+  /// Standard algorithm Constructor
+  RootNtupleOutputStream(const std::string& name, ISvcLocator* pSvcLocator); 
+  /// Standard Destructor
+  virtual ~RootNtupleOutputStream();
+
+public:
+  typedef std::vector<std::pair<std::string, std::string> > TypeKeyPairs;
+  /// \name implement IAlgorithm
+  //@{
+  virtual StatusCode initialize();
+  virtual StatusCode finalize();
+  virtual StatusCode execute();
+  //@}
+  /// Stream the data
+  virtual StatusCode write();
+
+private:
+  /// Clear list of selected objects
+  void clearSelection();
+  /// Collect data objects for output streamer list
+  void collectAllObjects();
+  /// Return the list of selected objects
+  IDataSelector* selectedObjects() {
+    return &m_objects;
+  }
+  /// Incident service handle listening for LastInputFile
+  void handle(const Incident& incident);
+};
+
+} //> ns Athena
+
+#endif // ATHENAROOTCOMPS_ROOTNTUPLEOUTPUTSTREAM_H

File src/components/AthenaRootComps_entries.cxx

 #include "../RootSvc.h"
 
 #include "../RootNtupleEventSelector.h"
+#include "../RootNtupleOutputStream.h"
 
 #include "../RootAsciiDumperAlg.h"
 #include "../RootAsciiDumperAlgHandle.h"
 DECLARE_NAMESPACE_TOOL_FACTORY( Athena, RootOutputStreamTool )
 
 DECLARE_NAMESPACE_SERVICE_FACTORY(Athena, RootNtupleEventSelector)
+DECLARE_NAMESPACE_ALGORITHM_FACTORY(Athena, RootNtupleOutputStream)
 
 DECLARE_NAMESPACE_ALGORITHM_FACTORY( Athena, RootAsciiDumperAlg )
 DECLARE_NAMESPACE_ALGORITHM_FACTORY( Athena, RootAsciiDumperAlgHandle )
   DECLARE_NAMESPACE_TOOL(Athena, RootOutputStreamTool)
 
   DECLARE_NAMESPACE_SERVICE(Athena, RootNtupleEventSelector)
+  DECLARE_NAMESPACE_ALGORITHM(Athena, RootNtupleOutputStream)
 
   DECLARE_NAMESPACE_ALGORITHM(Athena, RootAsciiDumperAlg)
   DECLARE_NAMESPACE_ALGORITHM(Athena, RootAsciiDumperAlgHandle)

File test/run_test_athena_ntuple_dumper.sh

 
     echo "::: run athena-ntuple-dumper..."
     time_cmd="/usr/bin/time -o d3pd.timing.log"
-    ($time_cmd athena.py AthenaRootComps/test_athena_ntuple_dumper.py >& d3pd.log.txt) || return 1
+    ($time_cmd athena.py AthenaRootComps/test_athena_ntuple_dumper.py >& d3pd.001.log.txt) || return 1
     /bin/cat d3pd.timing.log
     echo "::: comparing ascii dumps..."
-    diff -urN $reffile $chkfile
+    /bin/rm -f d3pd.ascii.diff
+    diff -urN $reffile $chkfile >& d3pd.ascii.diff
     sc=$?
     echo "::: comparing ascii dumps...[$sc]"
 
     echo "::: reading back n-tuple file..."
     /bin/mv d3pd.root rb.d3pd.root || return 1
     /bin/rm -f $chkfile > /dev/null
-    ($time_cmd athena.py -c'FNAMES=["rb.d3pd.root"]' AthenaRootComps/test_athena_ntuple_dumper.py >& d3pd.log.txt) || return 1
+    ($time_cmd athena.py -c'FNAMES=["rb.d3pd.root"]' AthenaRootComps/test_athena_ntuple_dumper.py >& d3pd.002.log.txt) || return 1
     /bin/cat d3pd.timing.log
     echo "::: comparing ascii dumps..."
-    diff -urN $reffile $chkfile
+    /bin/rm -f rb.d3pd.ascii.diff
+    diff -urN $reffile $chkfile >& rb.d3pd.ascii.diff
     sc=$?
     echo "::: comparing ascii dumps...[$sc]"
 

File test/run_test_athena_variable_shape_ntuple.sh

 {
 echo "::: generate f1.root..."
 athena.py \
-    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["int#el_n","std::vector<float>#el_eta"]' \
+    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["el_n","el_eta"]' \
     AthenaRootComps/test_athena_variable_shape_ntuple.py \
     >| log.001.txt \
     || return 1
 
 echo "::: generate f2.root..."
 athena.py \
-    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["int#el_n"]' \
+    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["el_n"]' \
     AthenaRootComps/test_athena_variable_shape_ntuple.py \
     >| log.002.txt \
     || return 1
 
 echo "::: generate f3.root..."
 athena.py \
-    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["int#el_n","std::vector<float>#el_eta"]' \
+    -c 'EVTMAX=10; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["el_n","el_eta"]' \
     AthenaRootComps/test_athena_variable_shape_ntuple.py \
     >| log.003.txt \
     || return 1
 
 echo "::: generate merged.root..."
 athena.py \
-    -c 'EVTMAX=30; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["int#el_n",]; FNAMES=["f1.root","f2.root","f3.root"]' \
+    -c 'EVTMAX=30; BRANCHES=["RunNumber", "EventNumber", "el_n", "el_eta"]; OUTBRANCHES=["el_n",]; FNAMES=["f1.root","f2.root","f3.root"]' \
     AthenaRootComps/test_athena_variable_shape_ntuple.py \
     >| log.004.txt \
     || return 1