Commits

Stephen Skory committed cf1db8d

Simplifying the inline logic.

Comments (0)

Files changed (1)

yt/analysis_modules/halo_finding/rockstar/rockstar.py

 from os import path
 
 class InlineRunner(ParallelAnalysisInterface):
-    def __init__(self, num_writers):
+    def __init__(self):
         # If this is being run inline, num_readers == comm.size, always.
         psize = ytcfg.getint("yt", "__global_parallel_size")
         self.num_readers = psize
-        if num_writers is None:
-            self.num_writers =  psize
-        else:
-            self.num_writers = min(num_writers, psize)
-
-    def split_work(self, pool):
-        avail = range(pool.comm.size)
-        self.writers = []
-        self.readers = []
-        # If we're inline, everyone is a reader.
-        self.readers = avail[:]
-        if self.num_writers == pool.comm.size:
-            # And everyone is a writer!
-            self.writers = avail[:]
-        else:
-            # Everyone is not a writer.
-            # Cyclically assign writers which should approximate
-            # memory load balancing (depending on the mpirun call,
-            # but this should do it in most cases).
-            stride = int(ceil(float(pool.comm.size) / self.num_writers))
-            while len(self.writers) < self.num_writers:
-                self.writers.extend(avail[::stride])
-                for r in avail:
-                    avail.pop(avail.index(r))
-
+        # No choice for you, everyone's a writer too!
+        self.num_writers =  psize
+    
     def run(self, handler, pool):
         # If inline, we use forks.
         server_pid = 0
             if server_pid == 0:
                 handler.start_server()
                 os._exit(0)
-        # Start writers.
+        # Start writers on all.
         writer_pid = 0
-        if pool.comm.rank in self.writers:
-            time.sleep(0.05 + pool.comm.rank/10.0)
-            writer_pid = os.fork()
-            if writer_pid == 0:
-                handler.start_writer()
-                os._exit(0)
-        # Start readers, not forked.
-        if pool.comm.rank in self.readers:
-            time.sleep(0.05 + pool.comm.rank/10.0)
-            handler.start_reader()
+        time.sleep(0.05 + pool.comm.rank/10.0)
+        writer_pid = os.fork()
+        if writer_pid == 0:
+            handler.start_writer()
+            os._exit(0)
+        # Everyone's a reader!
+        time.sleep(0.05 + pool.comm.rank/10.0)
+        handler.start_reader()
         # Make sure the forks are done, which they should be.
         if writer_pid != 0:
             os.waitpid(writer_pid, 0)
                     self.num_readers, self.num_writers, psize)
             raise RuntimeError
     
-    def split_work(self, pool):
-        self.readers = np.arange(self.num_readers) + 1
-        self.writers = np.arange(self.num_writers) + 1 + self.num_readers
-    
     def run(self, handler, wg):
         # Not inline so we just launch them directly from our MPI threads.
         if wg.name == "server":
         ParallelAnalysisInterface.__init__(self)
         # Decide how we're working.
         if ytcfg.getboolean("yt", "inline") == True:
-            self.runner = InlineRunner(num_writers)
+            self.runner = InlineRunner()
         else:
             self.runner = StandardRunner(num_readers, num_writers)
         self.num_readers = self.runner.num_readers