Commits

Anonymous committed f5b01c5

preliminary support for d3pd reading with MP

  • Participants
  • Parent commits c40322c

Comments (0)

Files changed (2)

+2012-03-02  Sebastien Binet  <sebastien.binet@cern.ch>
+
+	* tagging AthenaMP-00-03-33
+	* preliminary support for d3pd reading with MP
+	* M python/PyComps.py
+
 2012-02-21  Wim Lavrijsen <WLavrijsen@lbl.gov>
         * define sc in MpMergeUtils for the case where there are no files to merge
 

File python/PyComps.py

         from AthenaCommon import CfgMgr
         from AthenaCommon.Configurable import Configurable
         cfgs = Configurable.allConfigurables
+        has_athena_outstream = not (CfgMgr.AthenaOutputStream is None)
+        has_athena_regstream = not (CfgMgr.RegistrationStream is None)
         for cfg in cfgs.itervalues():
-            if isinstance (cfg, CfgMgr.AthenaOutputStream):
+            if (has_athena_outstream and
+                isinstance (cfg, CfgMgr.AthenaOutputStream)):
                 if not ioreg.has_key(cfg.name()):
                     ioreg[cfg.name()] = dict()
                 try:
                     ioreg[cfg.name()][cfg.OutputFile] = ['<output>', None]
                 except AttributeError:
                     msg.warning("The stream %s has no OutputFile attribute. Skip adding to IoRegistry" % cfg.name())
-            if isinstance (cfg, CfgMgr.RegistrationStream):
+            if (has_athena_regstream and 
+                isinstance (cfg, CfgMgr.RegistrationStream)):
                 if not ioreg.has_key(cfg.name()):
                     ioreg[cfg.name()] = dict()
                 try:
 
             if hasattr(svcMgr,'ByteStreamInputSvc'):
                 totalnentries=trfutil.getCachedFileInfo(svcMgr.ByteStreamInputSvc.FullFileName,'nentries')
-            elif hasattr(svcMgr.EventSelector,'InputCollections'):
-                totalnentries=trfutil.getCachedFileInfo(svcMgr.EventSelector.InputCollections,'nentries')
+            elif hasattr(svcMgr, 'EventSelector'):
+                # root n-tuple
+                if (CfgMgr.Athena__RootNtupleEventSelector and
+                    isinstance(evtsel, CfgMgr.Athena__RootNtupleEventSelector)):
+                    for fname in list(evtsel.InputCollections):
+                        totalnentries.append(
+                            trfutil.ntup_entries(
+                                fname=fname,
+                                tree_names=evtsel.TupleName
+                                )
+                            )
+                    pass
+                # athena-pool
+                elif hasattr(svcMgr.EventSelector,'InputCollections'):
+                    totalnentries=trfutil.getCachedFileInfo(
+                        svcMgr.EventSelector.InputCollections,
+                        'nentries'
+                        )
 
             for nentry in totalnentries:
                 self._nevt += nentry
         if not self._do_round_robin: 
             # a shared queue to distribute event numbers
             item_nbr = int(maxevt) + int(self._ncpus) + 5
-            _debug ("QUEUE will have [%i] items: %i evts to process and %i Nones to notify \
-                    workers about the end of queue" % (item_nbr, maxevt, 5 + self._ncpus ) )
+            _debug ("QUEUE will have [%i] items: %i evts to process and %i Nones to notify" \
+                    "workers about the end of queue" % (item_nbr, maxevt, 5 + self._ncpus ) )
             self._event_queue = mp.Queue(item_nbr)
-            _info ("a shared queue with [%i] evts  for [%i] workers in parallel \
-                    ..." % (maxevt, self._ncpus) )
+            _info ("a shared queue with [%i] evts  for [%i] workers in parallel" \
+                    "..." % (maxevt, self._ncpus) )
             evt_list = list()
             #for i in xrange(maxevt - self._events_before_fork): # event order list
             #    evt_nbr = i + self._events_before_fork 
    #import sys
    #from AthenaCommon.Logging import log as msg
    msg.debug("setupEvtSelForSeekOps:")
+   if sys.modules.has_key('AthenaRootComps.ReadAthenaRoot'):
+       # athenarootcomps has seeking enabled by default
+       msg.info('=> Seeking enabled.')
+       return
+   
    if not sys.modules.has_key('AthenaPoolCnvSvc.ReadAthenaPool'):
       ## user did not import that module so we give up
       msg.info( "Cannot enable 'seeking' b/c module " + \
             return out
         ost1=os.times(); t1 = time.time();
         print_tuple = (pid, Utils.get_cpu(pid), evt, t1-t0) + tuple( map(operator.sub, ost1, ost0))
-        _debug("WORKER_EVENT_STAT: pid=%i cpu=%i evt=%i \
-                elap_time=%.6f user_time=%.6f system_time=%.6f \
-                cutime=%.6f cstime=%.6f elap_os_time=%.6f" % print_tuple )
+        _debug("WORKER_EVENT_STAT: pid=%i cpu=%i evt=%i " \
+                "elap_time=%.6f user_time=%.6f system_time=%.6f " \
+                "cutime=%.6f cstime=%.6f elap_os_time=%.6f" % print_tuple )
         _debug( "SO FAR THIS WORKER PROCESSED %i events: %s " % (_elm.nbr - _elm._events_before_fork, _elm.processed_evts) )
         evt = get_event()