Source

yt.rife / main_engine.py

Full commit
from functools import wraps
import inspect
from yt.mods import *
from mpi4py import MPI
import time
import Pyro4
import uuid

def requires_pf(func):
    @wraps(func)
    def wrap(self, *args, **kwargs):
        if self.pf is None: raise RuntimeError
        return func(self, *args, **kwargs)
    return wrap

def broadcasted_action(func):
    @wraps(func)
    def sender(*args, **kwargs):
        self = args[0]
        if self.comm.rank != 0:
            return func(*args, **kwargs)
        argspec = inspect.getcallargs(func, *args, **kwargs)
        argspec.pop("self")
        msg = dict(type = func.func_name, args = argspec)
        self.send(msg)
        mylog.debug("Sending message: %s, %s", msg['type'], msg['args'])
        return func(*args, **kwargs)
    return sender

class LockstepProxy(object):
    _run = True
    def __init__(self, comm = None):
        if comm is None:
            from yt.utilities.parallel_tools.parallel_analysis_interface \
                import communication_system
            comm = communication_system.communicators[-1]
        self.comm = comm

    def send(self, msg):
        self.comm.mpi_bcast(msg)

    def receive(self, msg):
        mylog.debug("Received message: %s, %s", msg['type'], msg['args'])
        func = getattr(self, msg['type'])
        func(**msg['args'])
        
    def runwait(self):
        while self._run:
            msg = self.comm.mpi_bcast(None)
            if msg is not None: self.receive(msg)

class VolumeRenderingHandler(LockstepProxy):
    pf = None

    @broadcasted_action
    def load(self, fn, resolution = 1024):
        self.pf = load(fn)
        if self.pf is None: raise IOError(fn)
        c = (self.pf.domain_right_edge + self.pf.domain_left_edge)/2.0
        self.cam = self.pf.h.camera(c, [1.0, 1.0, 1.0],
                      1.0/self.pf['unitary'], resolution, None)

    @broadcasted_action
    def switch_view(self, normal_vector, width, center, north_vectors):
        self.cam.switch_view(normal_vector, width, center, north_vectors)

    @broadcasted_action
    def render(self):
        # In here is where we broadcast to all the other processors (if we have
        # them running!) what the view is, before we snapshot.
        # We can get this from cam.normal_vector, cam.width, cam.center cam.north_vector
        if self.cam.transfer_function is None:
            raise RuntimeError
        sn = self.cam.snapshot()
        print sn
        return sn

    @broadcasted_action
    def isocontours(self, limits, nlayers, width):
        self.cam.transfer_function = ColorTransferFunction(limits)
        self.cam.transfer_function.add_layers(nlayers, w=width)

if __name__ == "__main__":
    VRH = VolumeRenderingHandler()
    if MPI.COMM_WORLD.rank == 0:
        Pyro4.config.HMAC_KEY = uuid.uuid4().hex
        print "HMAC KEY", Pyro4.config.HMAC_KEY
        Pyro4.Daemon.serveSimple(
            {VRH: "yt.volume_rendering_handler"},
            ns=False)
    else:
        VRH.runwait()