Commits

Britton Smith committed 3ffe2b2 Merge

Merged in MatthewTurk/yt (pull request #353)

  • Participants
  • Parent commits 61f553c, c65043d

Comments (0)

Files changed (4)

doc/install_script.sh

 echo 'c13116c1f0547000cc565e15774687b9e884f8b74fb62a84e578408a868a84961704839065ae4f21b662e87f2aaedf6ea424ea58dfa9d3d73c06281f806d15dd  nose-1.2.1.tar.gz' > nose-1.2.1.tar.gz.sha512
 echo '73de2c99406a38f85273931597525cec4ebef55b93712adca3b0bfea8ca3fc99446e5d6495817e9ad55cf4d48feb7fb49734675c4cc8938db8d4a5225d30eca7  python-hglib-0.2.tar.gz' > python-hglib-0.2.tar.gz.sha512
 echo 'ffc602eb346717286b3d0a6770c60b03b578b3cf70ebd12f9e8b1c8c39cdb12ef219ddaa041d7929351a6b02dbb8caf1821b5452d95aae95034cbf4bc9904a7a  sympy-0.7.2.tar.gz' > sympy-0.7.2.tar.gz.sha512
-echo 'd91fa23d8f86c5f5b3df210731c14c92a2e3d999979ef2c4c67be3dda19a8a8d2363f8cd23611957280c17f61653b0e66f59d3f5515b6d443b22cddeaef86ab6  Rockstar-0.99.tar.gz' > Rockstar-0.99.tar.gz.sha512
-
+echo '172f2bc671145ebb0add2669c117863db35851fb3bdb192006cd710d4d038e0037497eb39a6d01091cb923f71a7e8982a77b6e80bf71d6275d5d83a363c8d7e5  rockstar-0.99.6.tar.gz' > rockstar-0.99.6.tar.gz.sha512
 # Individual processes
 [ -z "$HDF5_DIR" ] && get_ytproject hdf5-1.8.9.tar.gz
 [ $INST_ZLIB -eq 1 ] && get_ytproject zlib-1.2.3.tar.bz2 
 # Now we build Rockstar and set its environment variable.
 if [ $INST_ROCKSTAR -eq 1 ]
 then
-    if [ ! -e Rockstar-0.99/done ]
+    if [ ! -e Rockstar/done ]
     then
-        [ ! -e Rockstar-0.99 ] && tar xfz Rockstar-0.99.tar.gz
+        [ ! -e Rockstar] && tar xfz rockstar-0.99.6.tar.gz
         echo "Building Rockstar"
-        cd Rockstar-0.99
+        cd Rockstar
         ( make lib 2>&1 ) 1>> ${LOG_FILE} || do_exit
         cp librockstar.so ${DEST_DIR}/lib
-        ROCKSTAR_DIR=${DEST_DIR}/src/Rockstar-0.99
+        ROCKSTAR_DIR=${DEST_DIR}/src/Rockstar
         echo $ROCKSTAR_DIR > ${YT_DIR}/rockstar.cfg
         touch done
         cd ..

yt/analysis_modules/halo_finding/halo_objects.py

     dont_wrap = ["get_sphere", "write_particle_list"]
     extra_wrap = ["__getitem__"]
 
-    # Used for Loaded and RockstarHalos
-    _saved_fields = {}
-    _ds_sort = None
-    _particle_mask = None
-
     def __init__(self, halo_list, id, indices=None, size=None, CoM=None,
         max_dens_point=None, group_total_mass=None, max_radius=None,
         bulk_vel=None, tasks=None, rms_vel=None, supp=None):
             self.supp = {}
         else:
             self.supp = supp
+        self._saved_fields = {}
+        self._ds_sort = None
+        self._particle_mask = None
 
     @property
     def particle_mask(self):
     _halo_dt = np.dtype([('id', np.int64), ('pos', (np.float32, 6)),
         ('corevel', (np.float32, 3)), ('bulkvel', (np.float32, 3)),
         ('m', np.float32), ('r', np.float32), ('child_r', np.float32),
+        ('vmax_r', np.float32), 
         ('mgrav', np.float32), ('vmax', np.float32),
         ('rvmax', np.float32), ('rs', np.float32),
+        ('klypin_rs', np.float32), 
         ('vrms', np.float32), ('J', (np.float32, 3)),
         ('energy', np.float32), ('spin', np.float32),
-        ('padding1', np.float32), ('num_p', np.int64),
+        ('alt_m', (np.float32, 4)), ('Xoff', np.float32),
+        ('Voff', np.float32), ('b_to_a', np.float32),
+        ('c_to_a', np.float32), ('A', (np.float32, 3)),
+        ('bullock_spin', np.float32), ('kin_to_pot', np.float32),
+        ('num_p', np.int64),
         ('num_child_particles', np.int64), ('p_start', np.int64),
         ('desc', np.int64), ('flags', np.int64), ('n_core', np.int64),
         ('min_pos_err', np.float32), ('min_vel_err', np.float32),
         ('min_bulkvel_err', np.float32), ('padding2', np.float32),])
-    # Above, padding1&2 are due to c byte ordering which pads between
+    # Above, padding* are due to c byte ordering which pads between
     # 4 and 8 byte values in the struct as to not overlap memory registers.
-    _tocleanup = ['padding1', 'padding2']
+    _tocleanup = ['padding2']
 
     def __init__(self, pf, out_list):
         ParallelAnalysisInterface.__init__(self)

yt/analysis_modules/halo_finding/rockstar/rockstar.py

 from os import mkdir
 from os import path
 
-# Get some definitions from Rockstar directly.
-if "ROCKSTAR_DIR" in os.environ:
-    ROCKSTAR_DIR = os.environ["ROCKSTAR_DIR"]
-elif os.path.exists("rockstar.cfg"):
-    ROCKSTAR_DIR = open("rockstar.cfg").read().strip()
-else:
-    print "Reading Rockstar location from rockstar.cfg failed."
-    print "Please place the base directory of your"
-    print "Rockstar install in rockstar.cfg and restart."
-    print "(ex: \"echo '/path/to/Rockstar-0.99' > rockstar.cfg\" )"
-    sys.exit(1)
-lines = file(path.join(ROCKSTAR_DIR, 'server.h'))
-READER_TYPE = None
-WRITER_TYPE = None
-for line in lines:
-    if "READER_TYPE" in line:
-        line = line.split()
-        READER_TYPE = int(line[-1])
-    if "WRITER_TYPE" in line:
-        line = line.split()
-        WRITER_TYPE = int(line[-1])
-    if READER_TYPE != None and WRITER_TYPE != None:
-        break
-lines.close()
-
 class InlineRunner(ParallelAnalysisInterface):
-    def __init__(self, num_writers):
+    def __init__(self):
         # If this is being run inline, num_readers == comm.size, always.
-        self.num_readers = ytcfg.getint("yt", "__global_parallel_size")
-        if num_writers is None:
-            self.num_writers =  ytcfg.getint("yt", "__global_parallel_size")
-        else:
-            self.num_writers = min(num_writers,
-                ytcfg.getint("yt", "__global_parallel_size"))
-
-    def split_work(self, pool):
-        avail = range(pool.comm.size)
-        self.writers = []
-        self.readers = []
-        # If we're inline, everyone is a reader.
-        self.readers = avail[:]
-        if self.num_writers == pool.comm.size:
-            # And everyone is a writer!
-            self.writers = avail[:]
-        else:
-            # Everyone is not a writer.
-            # Cyclically assign writers which should approximate
-            # memory load balancing (depending on the mpirun call,
-            # but this should do it in most cases).
-            stride = int(ceil(float(pool.comm.size) / self.num_writers))
-            while len(self.writers) < self.num_writers:
-                self.writers.extend(avail[::stride])
-                for r in readers:
-                    avail.pop(avail.index(r))
-
+        psize = ytcfg.getint("yt", "__global_parallel_size")
+        self.num_readers = psize
+        # No choice for you, everyone's a writer too!
+        self.num_writers =  psize
+    
     def run(self, handler, pool):
         # If inline, we use forks.
         server_pid = 0
             if server_pid == 0:
                 handler.start_server()
                 os._exit(0)
-        # Start writers.
+        # Start writers on all.
         writer_pid = 0
-        if pool.comm.rank in self.writers:
-            time.sleep(0.1 + pool.comm.rank/10.0)
-            writer_pid = os.fork()
-            if writer_pid == 0:
-                handler.start_client(WRITER_TYPE)
-                os._exit(0)
-        # Start readers, not forked.
-        if pool.comm.rank in self.readers:
-            time.sleep(0.1 + pool.comm.rank/10.0)
-            handler.start_client(READER_TYPE)
+        time.sleep(0.05 + pool.comm.rank/10.0)
+        writer_pid = os.fork()
+        if writer_pid == 0:
+            handler.start_writer()
+            os._exit(0)
+        # Everyone's a reader!
+        time.sleep(0.05 + pool.comm.rank/10.0)
+        handler.start_reader()
         # Make sure the forks are done, which they should be.
         if writer_pid != 0:
             os.waitpid(writer_pid, 0)
         if server_pid != 0:
             os.waitpid(server_pid, 0)
 
+    def setup_pool(self):
+        pool = ProcessorPool()
+        # Everyone is a reader, and when we're inline, that's all that matters.
+        readers = np.arange(ytcfg.getint("yt", "__global_parallel_size"))
+        pool.add_workgroup(ranks=readers, name="readers")
+        return pool, pool.workgroups[0]
+
 class StandardRunner(ParallelAnalysisInterface):
     def __init__(self, num_readers, num_writers):
         self.num_readers = num_readers
+        psize = ytcfg.getint("yt", "__global_parallel_size")
         if num_writers is None:
-            self.num_writers = ytcfg.getint("yt", "__global_parallel_size") \
-                - num_readers - 1
+            self.num_writers =  psize - num_readers - 1
         else:
-            self.num_writers = min(num_writers,
-                ytcfg.getint("yt", "__global_parallel_size"))
-        if self.num_readers + self.num_writers + 1 != ytcfg.getint("yt", \
-                "__global_parallel_size"):
+            self.num_writers = min(num_writers, psize)
+        if self.num_readers + self.num_writers + 1 != psize:
             mylog.error('%i reader + %i writers != %i mpi',
-                    self.num_readers, self.num_writers,
-                    ytcfg.getint("yt", "__global_parallel_size"))
+                    self.num_readers, self.num_writers, psize)
             raise RuntimeError
     
-    def split_work(self, pool):
-        # Who is going to do what.
-        avail = range(pool.comm.size)
-        self.writers = []
-        self.readers = []
-        # If we're not running inline, rank 0 should be removed immediately.
-        avail.pop(0)
-        # Now we assign the rest.
-        for i in range(self.num_readers):
-            self.readers.append(avail.pop(0))
-        for i in range(self.num_writers):
-            self.writers.append(avail.pop(0))
+    def run(self, handler, wg):
+        # Not inline so we just launch them directly from our MPI threads.
+        if wg.name == "server":
+            handler.start_server()
+        if wg.name == "readers":
+            time.sleep(0.05)
+            handler.start_reader()
+        if wg.name == "writers":
+            time.sleep(0.1)
+            handler.start_writer()
     
-    def run(self, handler, pool):
-        # Not inline so we just launch them directly from our MPI threads.
-        if pool.comm.rank == 0:
-            handler.start_server()
-        if pool.comm.rank in self.readers:
-            time.sleep(0.1 + pool.comm.rank/10.0)
-            handler.start_client(READER_TYPE)
-        if pool.comm.rank in self.writers:
-            time.sleep(0.2 + pool.comm.rank/10.0)
-            handler.start_client(WRITER_TYPE)
+    def setup_pool(self):
+        pool = ProcessorPool()
+        pool, workgroup = ProcessorPool.from_sizes(
+           [ (1, "server"),
+             (self.num_readers, "readers"),
+             (self.num_writers, "writers") ]
+        )
+        return pool, workgroup
 
 class RockstarHaloFinder(ParallelAnalysisInterface):
-    def __init__(self, ts, num_readers = 1, num_writers = None, 
-            outbase=None,particle_mass=-1.0,dm_type=1,force_res=None):
+    def __init__(self, ts, num_readers = 1, num_writers = None,
+            outbase="rockstar_halos", dm_type=1, 
+            force_res=None, total_particles=None, dm_only=False):
         r"""Spawns the Rockstar Halo finder, distributes dark matter
         particles and finds halos.
 
         outbase: str
             This is where the out*list files that Rockstar makes should be
             placed. Default is 'rockstar_halos'.
-        particle_mass: float
-            This sets the DM particle mass used in Rockstar.
         dm_type: 1
             In order to exclude stars and other particle types, define
             the dm_type. Default is 1, as Enzo has the DM particle type=1.
             last data snapshot (i.e. the one where time has evolved the
             longest) in the time series:
             ``pf_last.h.get_smallest_dx() * pf_last['mpch']``.
+        total_particles : int
+            If supplied, this is a pre-calculated total number of dark matter
+            particles present in the simulation. For example, this is useful
+            when analyzing a series of snapshots where the number of dark
+            matter particles should not change and this will save some disk
+            access time. If left unspecified, it will
+            be calculated automatically. Default: ``None``.
+        dm_only : boolean
+            If set to ``True``, it will be assumed that there are only dark
+            matter particles present in the simulation. This can save analysis
+            time if this is indeed the case. Default: ``False``.
             
         Returns
         -------
         from yt.mods import *
         import sys
 
-        files = glob.glob('/u/cmoody3/data/a*')
-        files.sort()
-        ts = TimeSeriesData.from_filenames(files)
+        ts = TimeSeriesData.from_filenames('/u/cmoody3/data/a*')
         pm = 7.81769027e+11
-        rh = RockstarHaloFinder(ts, particle_mass=pm)
+        rh = RockstarHaloFinder(ts)
         rh.run()
         """
+        ParallelAnalysisInterface.__init__(self)
         # Decide how we're working.
         if ytcfg.getboolean("yt", "inline") == True:
-            self.runner = InlineRunner(num_writers)
+            self.runner = InlineRunner()
         else:
             self.runner = StandardRunner(num_readers, num_writers)
         self.num_readers = self.runner.num_readers
         # Note that Rockstar does not support subvolumes.
         # We assume that all of the snapshots in the time series
         # use the same domain info as the first snapshots.
-        if not isinstance(ts,TimeSeriesData):
+        if not isinstance(ts, TimeSeriesData):
             ts = TimeSeriesData([ts])
         self.ts = ts
         self.dm_type = dm_type
-        tpf = ts.__iter__().next()
+        self.outbase = outbase
+        if force_res is None:
+            tpf = ts[-1] # Cache a reference
+            self.force_res = tpf.h.get_smallest_dx() * tpf['mpch']
+            # We have to delete now to wipe the hierarchy
+            del tpf
+        else:
+            self.force_res = force_res
+        self.total_particles = total_particles
+        self.dm_only = dm_only
+        # Setup pool and workgroups.
+        self.pool, self.workgroup = self.runner.setup_pool()
+        p = self._setup_parameters(ts)
+        params = self.comm.mpi_bcast(p, root = self.pool['readers'].ranks[0])
+        self.__dict__.update(params)
+        self.handler = rockstar_interface.RockstarInterface(self.ts)
+
+    def _setup_parameters(self, ts):
+        if self.workgroup.name != "readers": return None
+        tpf = ts[0]
         def _particle_count(field, data):
+            if self.dm_only:
+                return np.prod(data["particle_position_x"].shape)
             try:
-                return (data["particle_type"]==dm_type).sum()
+                return (data["particle_type"]==self.dm_type).sum()
             except KeyError:
                 return np.prod(data["particle_position_x"].shape)
-        add_field("particle_count",function=_particle_count, not_in_all=True,
-            particle_type=True)
-        # Get total_particles in parallel.
+        add_field("particle_count", function=_particle_count,
+                  not_in_all=True, particle_type=True)
         dd = tpf.h.all_data()
-        self.total_particles = int(dd.quantities['TotalQuantity']('particle_count')[0])
-        self.hierarchy = tpf.h
-        self.particle_mass = particle_mass 
-        self.center = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
-        if outbase is None:
-            outbase = 'rockstar_halos'
-        self.outbase = outbase
-        self.particle_mass = particle_mass
-        if force_res is None:
-            self.force_res = ts[-1].h.get_smallest_dx() * ts[-1]['mpch']
-        else:
-            self.force_res = force_res
-        self.left_edge = tpf.domain_left_edge
-        self.right_edge = tpf.domain_right_edge
-        self.center = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
-        # We set up the workgroups *before* initializing
-        # ParallelAnalysisInterface. Everyone is their own workgroup!
-        self.pool = ProcessorPool()
-        for i in range(ytcfg.getint("yt", "__global_parallel_size")):
-             self.pool.add_workgroup(size=1)
-        ParallelAnalysisInterface.__init__(self)
-        for wg in self.pool.workgroups:
-            if self.pool.comm.rank in wg.ranks:
-                self.workgroup = wg
-        self.handler = rockstar_interface.RockstarInterface(
-                self.ts, dd)
+        # Get DM particle mass.
+        all_fields = set(tpf.h.derived_field_list + tpf.h.field_list)
+        for g in tpf.h._get_objs("grids"):
+            if g.NumberOfParticles == 0: continue
+            if self.dm_only:
+                iddm = Ellipsis
+            elif "particle_type" in all_fields:
+                iddm = g["particle_type"] == self.dm_type
+            else:
+                iddm = Ellipsis
+            particle_mass = g['ParticleMassMsun'][iddm][0] / tpf.hubble_constant
+            break
+        p = {}
+        if self.total_particles is None:
+            # Get total_particles in parallel.
+            p['total_particles'] = int(dd.quantities['TotalQuantity']('particle_count')[0])
+        p['left_edge'] = tpf.domain_left_edge
+        p['right_edge'] = tpf.domain_right_edge
+        p['center'] = (tpf.domain_right_edge + tpf.domain_left_edge)/2.0
+        p['particle_mass'] = particle_mass
+        return p
+
 
     def __del__(self):
-        self.pool.free_all()
+        try:
+            self.pool.free_all()
+        except AttributeError:
+            # This really only acts to cut down on the misleading
+            # error messages when/if this class is called incorrectly
+            # or some other error happens and self.pool hasn't been created
+            # already.
+            pass
 
     def _get_hosts(self):
-        if self.pool.comm.size == 1 or self.pool.comm.rank == 0:
+        if self.comm.rank == 0 or self.comm.size == 1:
             server_address = socket.gethostname()
             sock = socket.socket()
             sock.bind(('', 0))
             del sock
         else:
             server_address, port = None, None
-        self.server_address, self.port = self.pool.comm.mpi_bcast(
+        self.server_address, self.port = self.comm.mpi_bcast(
             (server_address, port))
         self.port = str(self.port)
 
         self.handler.setup_rockstar(self.server_address, self.port,
                     len(self.ts), self.total_particles, 
                     self.dm_type,
-                    parallel = self.pool.comm.size > 1,
+                    parallel = self.comm.size > 1,
                     num_readers = self.num_readers,
                     num_writers = self.num_writers,
                     writing_port = -1,
                     block_ratio = block_ratio,
                     outbase = self.outbase,
-                    force_res=self.force_res,
+                    force_res = self.force_res,
                     particle_mass = float(self.particle_mass),
+                    dm_only = int(self.dm_only),
                     **kwargs)
         # Make the directory to store the halo lists in.
-        if self.pool.comm.rank == 0:
+        if self.comm.rank == 0:
             if not os.path.exists(self.outbase):
-                os.mkdir(self.outbase)
+                os.makedirs(self.outbase)
             # Make a record of which dataset corresponds to which set of
             # output files because it will be easy to lose this connection.
             fp = open(self.outbase + '/pfs.txt', 'w')
                 fp.write(line)
             fp.close()
         # This barrier makes sure the directory exists before it might be used.
-        self.pool.comm.barrier()
-        if self.pool.comm.size == 1:
+        self.comm.barrier()
+        if self.comm.size == 1:
             self.handler.call_rockstar()
         else:
-            # Split up the work.
-            self.runner.split_work(self.pool)
             # And run it!
-            self.runner.run(self.handler, self.pool)
-        self.pool.comm.barrier()
+            self.runner.run(self.handler, self.workgroup)
+        self.comm.barrier()
         self.pool.free_all()
     
     def halo_list(self,file_name='out_0.list'):

yt/analysis_modules/halo_finding/rockstar/rockstar_interface.pyx

 
 cdef import from "server.h" nogil:
     int server()
+    np.int64_t READER_TYPE
+    np.int64_t WRITER_TYPE
 
 cdef import from "client.h" nogil:
     void client(np.int64_t in_type)
 
 cdef import from "meta_io.h":
     void read_particles(char *filename)
-    void output_and_free_halos(np.int64_t id_offset, np.int64_t snap, 
+    void output_halos(np.int64_t id_offset, np.int64_t snap, 
 			   np.int64_t chunk, float *bounds)
 
 cdef import from "config_vars.h":
     np.float64_t AVG_PARTICLE_SPACING
     np.int64_t SINGLE_SNAP
 
-def print_rockstar_settings():
-    # We have to do the config
-    print "FILE_FORMAT =", FILE_FORMAT
-    print "PARTICLE_MASS =", PARTICLE_MASS
+# Forward declare
+cdef class RockstarInterface
 
-    print "MASS_DEFINITION =", MASS_DEFINITION
-    print "MIN_HALO_OUTPUT_SIZE =", MIN_HALO_OUTPUT_SIZE
-    print "FORCE_RES =", FORCE_RES
-
-    print "SCALE_NOW =", SCALE_NOW
-    print "h0 =", h0
-    print "Ol =", Ol
-    print "Om =", Om
-
-    print "GADGET_ID_BYTES =", GADGET_ID_BYTES
-    print "GADGET_MASS_CONVERSION =", GADGET_MASS_CONVERSION
-    print "GADGET_LENGTH_CONVERSION =", GADGET_LENGTH_CONVERSION
-    print "GADGET_SKIP_NON_HALO_PARTICLES =", GADGET_SKIP_NON_HALO_PARTICLES
-    print "RESCALE_PARTICLE_MASS =", RESCALE_PARTICLE_MASS
-
-    print "PARALLEL_IO =", PARALLEL_IO
-    print "PARALLEL_IO_SERVER_ADDRESS =", PARALLEL_IO_SERVER_ADDRESS
-    print "PARALLEL_IO_SERVER_PORT =", PARALLEL_IO_SERVER_PORT
-    print "PARALLEL_IO_WRITER_PORT =", PARALLEL_IO_WRITER_PORT
-    print "PARALLEL_IO_SERVER_INTERFACE =", PARALLEL_IO_SERVER_INTERFACE
-    print "RUN_ON_SUCCESS =", RUN_ON_SUCCESS
-
-    print "INBASE =", INBASE
-    print "FILENAME =", FILENAME
-    print "STARTING_SNAP =", STARTING_SNAP
-    print "NUM_SNAPS =", NUM_SNAPS
-    print "NUM_BLOCKS =", NUM_BLOCKS
-    print "NUM_READERS =", NUM_READERS
-    print "PRELOAD_PARTICLES =", PRELOAD_PARTICLES
-    print "SNAPSHOT_NAMES =", SNAPSHOT_NAMES
-    print "LIGHTCONE_ALT_SNAPS =", LIGHTCONE_ALT_SNAPS
-    print "BLOCK_NAMES =", BLOCK_NAMES
-
-    print "OUTBASE =", OUTBASE
-    print "OVERLAP_LENGTH =", OVERLAP_LENGTH
-    print "NUM_WRITERS =", NUM_WRITERS
-    print "FORK_READERS_FROM_WRITERS =", FORK_READERS_FROM_WRITERS
-    print "FORK_PROCESSORS_PER_MACHINE =", FORK_PROCESSORS_PER_MACHINE
-
-    print "OUTPUT_FORMAT =", OUTPUT_FORMAT
-    print "DELETE_BINARY_OUTPUT_AFTER_FINISHED =", DELETE_BINARY_OUTPUT_AFTER_FINISHED
-    print "FULL_PARTICLE_CHUNKS =", FULL_PARTICLE_CHUNKS
-    print "BGC2_SNAPNAMES =", BGC2_SNAPNAMES
-
-    print "BOUND_PROPS =", BOUND_PROPS
-    print "BOUND_OUT_TO_HALO_EDGE =", BOUND_OUT_TO_HALO_EDGE
-    print "DO_MERGER_TREE_ONLY =", DO_MERGER_TREE_ONLY
-    print "IGNORE_PARTICLE_IDS =", IGNORE_PARTICLE_IDS
-    print "TRIM_OVERLAP =", TRIM_OVERLAP
-    print "ROUND_AFTER_TRIM =", ROUND_AFTER_TRIM
-    print "LIGHTCONE =", LIGHTCONE
-    print "PERIODIC =", PERIODIC
-
-    print "LIGHTCONE_ORIGIN =", LIGHTCONE_ORIGIN[0]
-    print "LIGHTCONE_ORIGIN[1] =", LIGHTCONE_ORIGIN[1]
-    print "LIGHTCONE_ORIGIN[2] =", LIGHTCONE_ORIGIN[2]
-    print "LIGHTCONE_ALT_ORIGIN =", LIGHTCONE_ALT_ORIGIN[0]
-    print "LIGHTCONE_ALT_ORIGIN[1] =", LIGHTCONE_ALT_ORIGIN[1]
-    print "LIGHTCONE_ALT_ORIGIN[2] =", LIGHTCONE_ALT_ORIGIN[2]
-
-    print "LIMIT_CENTER =", LIMIT_CENTER[0]
-    print "LIMIT_CENTER[1] =", LIMIT_CENTER[1]
-    print "LIMIT_CENTER[2] =", LIMIT_CENTER[2]
-    print "LIMIT_RADIUS =", LIMIT_RADIUS
-
-    print "SWAP_ENDIANNESS =", SWAP_ENDIANNESS
-    print "GADGET_VARIANT =", GADGET_VARIANT
-
-    print "FOF_FRACTION =", FOF_FRACTION
-    print "FOF_LINKING_LENGTH =", FOF_LINKING_LENGTH
-    print "INCLUDE_HOST_POTENTIAL_RATIO =", INCLUDE_HOST_POTENTIAL_RATIO
-    print "DOUBLE_COUNT_SUBHALO_MASS_RATIO =", DOUBLE_COUNT_SUBHALO_MASS_RATIO
-    print "TEMPORAL_HALO_FINDING =", TEMPORAL_HALO_FINDING
-    print "MIN_HALO_PARTICLES =", MIN_HALO_PARTICLES
-    print "UNBOUND_THRESHOLD =", UNBOUND_THRESHOLD
-    print "ALT_NFW_METRIC =", ALT_NFW_METRIC
-
-    print "TOTAL_PARTICLES =", TOTAL_PARTICLES
-    print "BOX_SIZE =", BOX_SIZE
-    print "OUTPUT_HMAD =", OUTPUT_HMAD
-    print "OUTPUT_PARTICLES =", OUTPUT_PARTICLES
-    print "OUTPUT_LEVELS =", OUTPUT_LEVELS
-    print "DUMP_PARTICLES =", DUMP_PARTICLES[0]
-    print "DUMP_PARTICLES[1] =", DUMP_PARTICLES[1]
-    print "DUMP_PARTICLES[2] =", DUMP_PARTICLES[2]
-
-    print "AVG_PARTICLE_SPACING =", AVG_PARTICLE_SPACING
-    print "SINGLE_SNAP =", SINGLE_SNAP
-
-cdef class RockstarInterface
 cdef void rh_read_particles(char *filename, particle **p, np.int64_t *num_p) with gil:
     global SCALE_NOW
     cdef np.float64_t conv[6], left_edge[6]
     block = int(str(filename).rsplit(".")[-1])
     n = rh.block_ratio
 
-    all_grids = pf.h.grids
     SCALE_NOW = 1.0/(pf.current_redshift+1.0)
     # Now we want to grab data from only a subset of the grids for each reader.
-    if NUM_BLOCKS == 1:
-        grids = all_grids
-    else:
-        if ytcfg.getboolean("yt", "inline") == False:
-            fnames = np.array([g.filename for g in all_grids])
-            sort = fnames.argsort()
-            grids = np.array_split(all_grids[sort], NUM_BLOCKS)[block]
-        else:
-            # We must be inline, grap only the local grids.
-            grids  = [g for g in all_grids if g.proc_num ==
-                          ytcfg.getint('yt','__topcomm_parallel_rank')]
-    
     all_fields = set(pf.h.derived_field_list + pf.h.field_list)
 
     # First we need to find out how many this reader is going to read in
     # if the number of readers > 1.
     if NUM_BLOCKS > 1:
         local_parts = 0
-        for g in grids:
+        for g in pf.h._get_objs("grids"):
             if g.NumberOfParticles == 0: continue
-            if "particle_type" in all_fields:
-                #iddm = dd._get_data_from_grid(g, "particle_type")==rh.dm_type
+            if rh.dm_only:
+                iddm = Ellipsis
+            elif "particle_type" in all_fields:
                 iddm = g["particle_type"] == rh.dm_type
             else:
                 iddm = Ellipsis
     left_edge[2] = pf.domain_left_edge[2]
     left_edge[3] = left_edge[4] = left_edge[5] = 0.0
     pi = 0
-    for g in grids:
+    for g in pf.h._get_objs("grids"):
         if g.NumberOfParticles == 0: continue
-        if "particle_type" in all_fields:
+        if rh.dm_only:
+            iddm = Ellipsis
+        elif "particle_type" in all_fields:
             iddm = g["particle_type"] == rh.dm_type
         else:
             iddm = Ellipsis
     cdef public int block_ratio
     cdef public int dm_type
     cdef public int total_particles
+    cdef public int dm_only
 
-    def __cinit__(self, ts, data_source):
+    def __cinit__(self, ts):
         self.ts = ts
         self.tsl = ts.__iter__() #timseries generator used by read
-        self.data_source = data_source
 
     def setup_rockstar(self, char *server_address, char *server_port,
                        int num_snaps, np.int64_t total_particles,
                        int dm_type,
-                       np.float64_t particle_mass = -1.0,
+                       np.float64_t particle_mass,
                        int parallel = False, int num_readers = 1,
                        int num_writers = 1,
                        int writing_port = -1, int block_ratio = 1,
                        int periodic = 1, force_res=None,
-                       int min_halo_size = 25, outbase = "None"):
+                       int min_halo_size = 25, outbase = "None",
+                       int dm_only = 0):
         global PARALLEL_IO, PARALLEL_IO_SERVER_ADDRESS, PARALLEL_IO_SERVER_PORT
         global FILENAME, FILE_FORMAT, NUM_SNAPS, STARTING_SNAP, h0, Ol, Om
         global BOX_SIZE, PERIODIC, PARTICLE_MASS, NUM_BLOCKS, NUM_READERS
         MIN_HALO_OUTPUT_SIZE=min_halo_size
         TOTAL_PARTICLES = total_particles
         self.block_ratio = block_ratio
+        self.dm_only = dm_only
         
         tpf = self.ts[0]
         h0 = tpf.hubble_constant
             #workaround is to make a new directory
             OUTBASE = outbase 
 
-        if particle_mass < 0:
-            particle_mass = tpf.h.grids[0]["ParticleMassMsun"][0] / h0
         PARTICLE_MASS = particle_mass
         PERIODIC = periodic
         BOX_SIZE = (tpf.domain_right_edge[0] -
     def call_rockstar(self):
         read_particles("generic")
         rockstar(NULL, 0)
-        output_and_free_halos(0, 0, 0, NULL)
+        output_halos(0, 0, 0, NULL)
 
     def start_server(self):
         with nogil:
             server()
 
-    def start_client(self, in_type):
-        in_type = np.int64(in_type)
+    def start_reader(self):
+        cdef np.int64_t in_type = np.int64(READER_TYPE)
         client(in_type)
+
+    def start_writer(self):
+        cdef np.int64_t in_type = np.int64(WRITER_TYPE)
+        client(in_type)