Commits

Stephen Skory committed 1b2b0d4 Merge

Merged in Matt's changes.

  • Participants
  • Parent commits ee235fb, 27fa49e

Comments (0)

Files changed (3)

File doc/install_script.sh

 INST_PYX=0      # Install PyX?  Sometimes PyX can be problematic without a
                 # working TeX installation.
 INST_0MQ=1      # Install 0mq (for IPython) and affiliated bindings?
-INST_ROCKSTAR=1 # Install the Rockstar halo finder?
+INST_ROCKSTAR=0 # Install the Rockstar halo finder?
 
 # If you've got YT some other place, set this to point to it.
 YT_DIR=""

File yt/analysis_modules/halo_finding/rockstar/rockstar.py

 from os import mkdir
 from os import path
 
-# Get some definitions from Rockstar directly.
-if "ROCKSTAR_DIR" in os.environ:
-    ROCKSTAR_DIR = os.environ["ROCKSTAR_DIR"]
-elif os.path.exists("rockstar.cfg"):
-    ROCKSTAR_DIR = open("rockstar.cfg").read().strip()
-else:
-    print "Reading Rockstar location from rockstar.cfg failed."
-    print "Please place the base directory of your"
-    print "Rockstar install in rockstar.cfg and restart."
-    print "(ex: \"echo '/path/to/Rockstar-0.99' > rockstar.cfg\" )"
-    sys.exit(1)
-lines = file(path.join(ROCKSTAR_DIR, 'server.h'))
-READER_TYPE = None
-WRITER_TYPE = None
-for line in lines:
-    if "READER_TYPE" in line:
-        line = line.split()
-        READER_TYPE = int(line[-1])
-    if "WRITER_TYPE" in line:
-        line = line.split()
-        WRITER_TYPE = int(line[-1])
-    if READER_TYPE != None and WRITER_TYPE != None:
-        break
-lines.close()
-
 class InlineRunner(ParallelAnalysisInterface):
     def __init__(self, num_writers):
         # If this is being run inline, num_readers == comm.size, always.
-        self.num_readers = ytcfg.getint("yt", "__global_parallel_size")
+        psize = ytcfg.getint("yt", "__global_parallel_size")
+        self.num_readers = psize
         if num_writers is None:
-            self.num_writers =  ytcfg.getint("yt", "__global_parallel_size")
+            self.num_writers =  psize
         else:
-            self.num_writers = min(num_writers,
-                ytcfg.getint("yt", "__global_parallel_size"))
+            self.num_writers = min(num_writers, psize)
 
     def split_work(self, pool):
         avail = range(pool.comm.size)
             time.sleep(0.1 + pool.comm.rank/10.0)
             writer_pid = os.fork()
             if writer_pid == 0:
-                handler.start_client(WRITER_TYPE)
+                handler.start_writer()
                 os._exit(0)
         # Start readers, not forked.
         if pool.comm.rank in self.readers:
             time.sleep(0.1 + pool.comm.rank/10.0)
-            handler.start_client(READER_TYPE)
+            handler.start_reader()
         # Make sure the forks are done, which they should be.
         if writer_pid != 0:
             os.waitpid(writer_pid, 0)
 class StandardRunner(ParallelAnalysisInterface):
     def __init__(self, num_readers, num_writers):
         self.num_readers = num_readers
+        psize = ytcfg.getint("yt", "__global_parallel_size")
         if num_writers is None:
-            self.num_writers = ytcfg.getint("yt", "__global_parallel_size") \
-                - num_readers - 1
+            self.num_writers =  psize - num_readers - 1
         else:
-            self.num_writers = min(num_writers,
-                ytcfg.getint("yt", "__global_parallel_size"))
-        if self.num_readers + self.num_writers + 1 != ytcfg.getint("yt", \
-                "__global_parallel_size"):
+            self.num_writers = min(num_writers, psize)
+        if self.num_readers + self.num_writers + 1 != psize:
             mylog.error('%i reader + %i writers != %i mpi',
-                    self.num_readers, self.num_writers,
-                    ytcfg.getint("yt", "__global_parallel_size"))
+                    self.num_readers, self.num_writers, psize)
             raise RuntimeError
     
-    def split_work(self, pool):
-        # Who is going to do what.
-        avail = range(pool.comm.size)
-        self.writers = []
-        self.readers = []
-        # If we're not running inline, rank 0 should be removed immediately.
-        avail.pop(0)
-        # Now we assign the rest.
-        for i in range(self.num_readers):
-            self.readers.append(avail.pop(0))
-        for i in range(self.num_writers):
-            self.writers.append(avail.pop(0))
+    def split_work(self):
+        self.readers = np.arange(self.num_readers) + 1
+        self.writers = np.arange(self.num_writers) + 1 + self.num_readers
     
-    def run(self, handler, pool):
+    def run(self, handler, wg):
         # Not inline so we just launch them directly from our MPI threads.
-        if pool.comm.rank == 0:
+        if wg.name == "server":
             handler.start_server()
-        if pool.comm.rank in self.readers:
-            time.sleep(0.1 + pool.comm.rank/10.0)
-            handler.start_client(READER_TYPE)
-        if pool.comm.rank in self.writers:
-            time.sleep(0.2 + pool.comm.rank/10.0)
-            handler.start_client(WRITER_TYPE)
+        if wg.name == "readers":
+            time.sleep(0.1)
+            handler.start_reader()
+        if wg.name == "writers":
+            time.sleep(0.2)
+            handler.start_writer()
 
 class RockstarHaloFinder(ParallelAnalysisInterface):
-    def __init__(self, ts, num_readers = 1, num_writers = None, 
-            outbase=None,particle_mass=-1.0,dm_type=1,force_res=None):
+    def __init__(self, ts, num_readers = 1, num_writers = None,
+            outbase="rockstar_halos", particle_mass=-1.0, dm_type=1, 
+            force_res=None):
         r"""Spawns the Rockstar Halo finder, distributes dark matter
         particles and finds halos.
 
         from yt.mods import *
         import sys
 
-        files = glob.glob('/u/cmoody3/data/a*')
-        files.sort()
-        ts = TimeSeriesData.from_filenames(files)
+        ts = TimeSeriesData.from_filenames('/u/cmoody3/data/a*')
         pm = 7.81769027e+11
         rh = RockstarHaloFinder(ts, particle_mass=pm)
         rh.run()
         """
+        ParallelAnalysisInterface.__init__(self)
         # Decide how we're working.
         if ytcfg.getboolean("yt", "inline") == True:
             self.runner = InlineRunner(num_writers)
         # Note that Rockstar does not support subvolumes.
         # We assume that all of the snapshots in the time series
         # use the same domain info as the first snapshots.
-        if not isinstance(ts,TimeSeriesData):
+        if not isinstance(ts, TimeSeriesData):
             ts = TimeSeriesData([ts])
         self.ts = ts
         self.dm_type = dm_type
         tpf = ts.__iter__().next()
-        def _particle_count(field, data):
-            try:
-                return (data["particle_type"]==dm_type).sum()
-            except KeyError:
-                return np.prod(data["particle_position_x"].shape)
-        add_field("particle_count",function=_particle_count, not_in_all=True,
-            particle_type=True)
-        # Get total_particles in parallel.
-        dd = tpf.h.all_data()
-        self.total_particles = int(dd.quantities['TotalQuantity']('particle_count')[0])
-        self.hierarchy = tpf.h
-        self.particle_mass = particle_mass 
-        self.center = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
-        if outbase is None:
-            outbase = 'rockstar_halos'
         self.outbase = outbase
         self.particle_mass = particle_mass
         if force_res is None:
-            self.force_res = ts[-1].h.get_smallest_dx() * ts[-1]['mpch']
+            tpf = ts[-1] # Cache a reference
+            self.force_res = tpf.h.get_smallest_dx() * tpf['mpch']
+            # We have to delete now to wipe the hierarchy
+            del tpf
         else:
             self.force_res = force_res
-        self.left_edge = tpf.domain_left_edge
-        self.right_edge = tpf.domain_right_edge
-        self.center = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
         # We set up the workgroups *before* initializing
         # ParallelAnalysisInterface. Everyone is their own workgroup!
         self.pool = ProcessorPool()
-        for i in range(ytcfg.getint("yt", "__global_parallel_size")):
-             self.pool.add_workgroup(size=1)
-        ParallelAnalysisInterface.__init__(self)
-        for wg in self.pool.workgroups:
-            if self.pool.comm.rank in wg.ranks:
-                self.workgroup = wg
-        self.handler = rockstar_interface.RockstarInterface(
-                self.ts, dd)
+        self.pool, self.workgroup = ProcessorPool.from_sizes(
+           [ (1, "server"),
+             (self.num_readers, "readers"),
+             (self.num_writers, "writers") ]
+        )
+        p = self._setup_parameters(ts)
+        params = self.comm.mpi_bcast(p, root = self.pool['readers'].ranks[0])
+        self.__dict__.update(params)
+        self.handler = rockstar_interface.RockstarInterface(self.ts)
+
+    def _setup_parameters(self, ts):
+        if self.workgroup.name != "readers": return None
+        tpf = ts[0]
+        def _particle_count(field, data):
+            try:
+                return (data["particle_type"]==self.dm_type).sum()
+            except KeyError:
+                return np.prod(data["particle_position_x"].shape)
+        add_field("particle_count", function=_particle_count,
+                  not_in_all=True, particle_type=True)
+        # Get total_particles in parallel.
+        dd = tpf.h.all_data()
+        p = {}
+        p['total_particles'] = int(dd.quantities['TotalQuantity']('particle_count')[0])
+        p['left_edge'] = tpf.domain_left_edge
+        p['right_edge'] = tpf.domain_right_edge
+        p['center'] = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
+        return p
 
     def __del__(self):
         self.pool.free_all()
 
     def _get_hosts(self):
-        if self.pool.comm.size == 1 or self.pool.comm.rank == 0:
+        if self.comm.rank == 0 or self.comm.size == 1:
             server_address = socket.gethostname()
             sock = socket.socket()
             sock.bind(('', 0))
             del sock
         else:
             server_address, port = None, None
-        self.server_address, self.port = self.pool.comm.mpi_bcast(
+        self.server_address, self.port = self.comm.mpi_bcast(
             (server_address, port))
         self.port = str(self.port)
 
         self.handler.setup_rockstar(self.server_address, self.port,
                     len(self.ts), self.total_particles, 
                     self.dm_type,
-                    parallel = self.pool.comm.size > 1,
+                    parallel = self.comm.size > 1,
                     num_readers = self.num_readers,
                     num_writers = self.num_writers,
                     writing_port = -1,
                     block_ratio = block_ratio,
                     outbase = self.outbase,
-                    force_res=self.force_res,
+                    force_res = self.force_res,
                     particle_mass = float(self.particle_mass),
                     **kwargs)
         # Make the directory to store the halo lists in.
-        if self.pool.comm.rank == 0:
+        if self.comm.rank == 0:
             if not os.path.exists(self.outbase):
-                os.mkdir(self.outbase)
+                os.makedirs(self.outbase)
             # Make a record of which dataset corresponds to which set of
             # output files because it will be easy to lose this connection.
             fp = open(self.outbase + '/pfs.txt', 'w')
                 fp.write(line)
             fp.close()
         # This barrier makes sure the directory exists before it might be used.
-        self.pool.comm.barrier()
-        if self.pool.comm.size == 1:
+        self.comm.barrier()
+        if self.comm.size == 1:
             self.handler.call_rockstar()
         else:
             # Split up the work.
-            self.runner.split_work(self.pool)
+            self.runner.split_work()
             # And run it!
-            self.runner.run(self.handler, self.pool)
-        self.pool.comm.barrier()
+            self.runner.run(self.handler, self.workgroup)
+        self.comm.barrier()
         self.pool.free_all()
     
     def halo_list(self,file_name='out_0.list'):

File yt/analysis_modules/halo_finding/rockstar/rockstar_interface.pyx

 
 cdef import from "server.h" nogil:
     int server()
+    np.int64_t READER_TYPE
+    np.int64_t WRITER_TYPE
 
 cdef import from "client.h" nogil:
     void client(np.int64_t in_type)
 
 cdef import from "meta_io.h":
     void read_particles(char *filename)
-    void output_and_free_halos(np.int64_t id_offset, np.int64_t snap, 
+    void output_halos(np.int64_t id_offset, np.int64_t snap, 
 			   np.int64_t chunk, float *bounds)
 
 cdef import from "config_vars.h":
     np.float64_t AVG_PARTICLE_SPACING
     np.int64_t SINGLE_SNAP
 
-def print_rockstar_settings():
-    # We have to do the config
-    print "FILE_FORMAT =", FILE_FORMAT
-    print "PARTICLE_MASS =", PARTICLE_MASS
+# Forward declare
+cdef class RockstarInterface
 
-    print "MASS_DEFINITION =", MASS_DEFINITION
-    print "MIN_HALO_OUTPUT_SIZE =", MIN_HALO_OUTPUT_SIZE
-    print "FORCE_RES =", FORCE_RES
-
-    print "SCALE_NOW =", SCALE_NOW
-    print "h0 =", h0
-    print "Ol =", Ol
-    print "Om =", Om
-
-    print "GADGET_ID_BYTES =", GADGET_ID_BYTES
-    print "GADGET_MASS_CONVERSION =", GADGET_MASS_CONVERSION
-    print "GADGET_LENGTH_CONVERSION =", GADGET_LENGTH_CONVERSION
-    print "GADGET_SKIP_NON_HALO_PARTICLES =", GADGET_SKIP_NON_HALO_PARTICLES
-    print "RESCALE_PARTICLE_MASS =", RESCALE_PARTICLE_MASS
-
-    print "PARALLEL_IO =", PARALLEL_IO
-    print "PARALLEL_IO_SERVER_ADDRESS =", PARALLEL_IO_SERVER_ADDRESS
-    print "PARALLEL_IO_SERVER_PORT =", PARALLEL_IO_SERVER_PORT
-    print "PARALLEL_IO_WRITER_PORT =", PARALLEL_IO_WRITER_PORT
-    print "PARALLEL_IO_SERVER_INTERFACE =", PARALLEL_IO_SERVER_INTERFACE
-    print "RUN_ON_SUCCESS =", RUN_ON_SUCCESS
-
-    print "INBASE =", INBASE
-    print "FILENAME =", FILENAME
-    print "STARTING_SNAP =", STARTING_SNAP
-    print "NUM_SNAPS =", NUM_SNAPS
-    print "NUM_BLOCKS =", NUM_BLOCKS
-    print "NUM_READERS =", NUM_READERS
-    print "PRELOAD_PARTICLES =", PRELOAD_PARTICLES
-    print "SNAPSHOT_NAMES =", SNAPSHOT_NAMES
-    print "LIGHTCONE_ALT_SNAPS =", LIGHTCONE_ALT_SNAPS
-    print "BLOCK_NAMES =", BLOCK_NAMES
-
-    print "OUTBASE =", OUTBASE
-    print "OVERLAP_LENGTH =", OVERLAP_LENGTH
-    print "NUM_WRITERS =", NUM_WRITERS
-    print "FORK_READERS_FROM_WRITERS =", FORK_READERS_FROM_WRITERS
-    print "FORK_PROCESSORS_PER_MACHINE =", FORK_PROCESSORS_PER_MACHINE
-
-    print "OUTPUT_FORMAT =", OUTPUT_FORMAT
-    print "DELETE_BINARY_OUTPUT_AFTER_FINISHED =", DELETE_BINARY_OUTPUT_AFTER_FINISHED
-    print "FULL_PARTICLE_CHUNKS =", FULL_PARTICLE_CHUNKS
-    print "BGC2_SNAPNAMES =", BGC2_SNAPNAMES
-
-    print "BOUND_PROPS =", BOUND_PROPS
-    print "BOUND_OUT_TO_HALO_EDGE =", BOUND_OUT_TO_HALO_EDGE
-    print "DO_MERGER_TREE_ONLY =", DO_MERGER_TREE_ONLY
-    print "IGNORE_PARTICLE_IDS =", IGNORE_PARTICLE_IDS
-    print "TRIM_OVERLAP =", TRIM_OVERLAP
-    print "ROUND_AFTER_TRIM =", ROUND_AFTER_TRIM
-    print "LIGHTCONE =", LIGHTCONE
-    print "PERIODIC =", PERIODIC
-
-    print "LIGHTCONE_ORIGIN =", LIGHTCONE_ORIGIN[0]
-    print "LIGHTCONE_ORIGIN[1] =", LIGHTCONE_ORIGIN[1]
-    print "LIGHTCONE_ORIGIN[2] =", LIGHTCONE_ORIGIN[2]
-    print "LIGHTCONE_ALT_ORIGIN =", LIGHTCONE_ALT_ORIGIN[0]
-    print "LIGHTCONE_ALT_ORIGIN[1] =", LIGHTCONE_ALT_ORIGIN[1]
-    print "LIGHTCONE_ALT_ORIGIN[2] =", LIGHTCONE_ALT_ORIGIN[2]
-
-    print "LIMIT_CENTER =", LIMIT_CENTER[0]
-    print "LIMIT_CENTER[1] =", LIMIT_CENTER[1]
-    print "LIMIT_CENTER[2] =", LIMIT_CENTER[2]
-    print "LIMIT_RADIUS =", LIMIT_RADIUS
-
-    print "SWAP_ENDIANNESS =", SWAP_ENDIANNESS
-    print "GADGET_VARIANT =", GADGET_VARIANT
-
-    print "FOF_FRACTION =", FOF_FRACTION
-    print "FOF_LINKING_LENGTH =", FOF_LINKING_LENGTH
-    print "INCLUDE_HOST_POTENTIAL_RATIO =", INCLUDE_HOST_POTENTIAL_RATIO
-    print "DOUBLE_COUNT_SUBHALO_MASS_RATIO =", DOUBLE_COUNT_SUBHALO_MASS_RATIO
-    print "TEMPORAL_HALO_FINDING =", TEMPORAL_HALO_FINDING
-    print "MIN_HALO_PARTICLES =", MIN_HALO_PARTICLES
-    print "UNBOUND_THRESHOLD =", UNBOUND_THRESHOLD
-    print "ALT_NFW_METRIC =", ALT_NFW_METRIC
-
-    print "TOTAL_PARTICLES =", TOTAL_PARTICLES
-    print "BOX_SIZE =", BOX_SIZE
-    print "OUTPUT_HMAD =", OUTPUT_HMAD
-    print "OUTPUT_PARTICLES =", OUTPUT_PARTICLES
-    print "OUTPUT_LEVELS =", OUTPUT_LEVELS
-    print "DUMP_PARTICLES =", DUMP_PARTICLES[0]
-    print "DUMP_PARTICLES[1] =", DUMP_PARTICLES[1]
-    print "DUMP_PARTICLES[2] =", DUMP_PARTICLES[2]
-
-    print "AVG_PARTICLE_SPACING =", AVG_PARTICLE_SPACING
-    print "SINGLE_SNAP =", SINGLE_SNAP
-
-cdef class RockstarInterface
 cdef void rh_read_particles(char *filename, particle **p, np.int64_t *num_p) with gil:
     global SCALE_NOW
     cdef np.float64_t conv[6], left_edge[6]
     block = int(str(filename).rsplit(".")[-1])
     n = rh.block_ratio
 
-    all_grids = pf.h.grids
     SCALE_NOW = 1.0/(pf.current_redshift+1.0)
     # Now we want to grab data from only a subset of the grids for each reader.
-    if NUM_BLOCKS == 1:
-        grids = all_grids
-    else:
-        if ytcfg.getboolean("yt", "inline") == False:
-            fnames = np.array([g.filename for g in all_grids])
-            sort = fnames.argsort()
-            grids = np.array_split(all_grids[sort], NUM_BLOCKS)[block]
-        else:
-            # We must be inline, grap only the local grids.
-            grids  = [g for g in all_grids if g.proc_num ==
-                          ytcfg.getint('yt','__topcomm_parallel_rank')]
-    
     all_fields = set(pf.h.derived_field_list + pf.h.field_list)
 
     # First we need to find out how many this reader is going to read in
     # if the number of readers > 1.
     if NUM_BLOCKS > 1:
         local_parts = 0
-        for g in grids:
+        for g in pf.h._get_objs("grids"):
             if g.NumberOfParticles == 0: continue
             if "particle_type" in all_fields:
-                #iddm = dd._get_data_from_grid(g, "particle_type")==rh.dm_type
                 iddm = g["particle_type"] == rh.dm_type
             else:
                 iddm = Ellipsis
     left_edge[2] = pf.domain_left_edge[2]
     left_edge[3] = left_edge[4] = left_edge[5] = 0.0
     pi = 0
-    for g in grids:
+    for g in pf.h._get_objs("grids"):
         if g.NumberOfParticles == 0: continue
         if "particle_type" in all_fields:
             iddm = g["particle_type"] == rh.dm_type
     cdef public int dm_type
     cdef public int total_particles
 
-    def __cinit__(self, ts, data_source):
+    def __cinit__(self, ts):
         self.ts = ts
         self.tsl = ts.__iter__() #timseries generator used by read
-        self.data_source = data_source
 
     def setup_rockstar(self, char *server_address, char *server_port,
                        int num_snaps, np.int64_t total_particles,
     def call_rockstar(self):
         read_particles("generic")
         rockstar(NULL, 0)
-        output_and_free_halos(0, 0, 0, NULL)
+        output_halos(0, 0, 0, NULL)
 
     def start_server(self):
         with nogil:
             server()
 
-    def start_client(self, in_type):
-        in_type = np.int64(in_type)
+    def start_reader(self):
+        cdef np.int64_t in_type = np.int64(READER_TYPE)
         client(in_type)
+
+    def start_writer(self):
+        cdef np.int64_t in_type = np.int64(WRITER_TYPE)
+        client(in_type)