pypy / lib_pypy / distributed /

Full commit

import py
from socket import socket

XXX needs import adaptation as 'green' is removed from py lib for years 
from import decodemessage, message
from socket import socket, AF_INET, SOCK_STREAM
import marshal
import sys

TRACE = False
def trace(msg):
    if TRACE:
        print >>sys.stderr, msg

class Finished(Exception):

class SocketWrapper(object):
    def __init__(self, conn):
        self.buffer = ""
        self.conn = conn

class ReceiverWrapper(SocketWrapper):
    def receive(self):
        msg, self.buffer = decodemessage(self.buffer)
        while msg is None:
            data = self.conn.recv(8192)
            if not data:
                raise Finished()
            self.buffer += data
            msg, self.buffer = decodemessage(self.buffer)
        assert msg[0] == 'c'
        trace("received %s" % msg[1])
        return marshal.loads(msg[1])

class SenderWrapper(SocketWrapper):
    def send(self, data):
        trace("sending %s" % (data,))
        self.conn.sendall(message('c', marshal.dumps(data)))

def socket_listener(address, socket=socket):
    s = socket(AF_INET, SOCK_STREAM)
    print "Waiting for connection on %s" % (address,)
    conn, addr = s.accept()
    print "Connected from %s" % (addr,)

    return SenderWrapper(conn).send, ReceiverWrapper(conn).receive

def socket_loop(address, to_export, socket=socket):
    from distributed import RemoteProtocol, remote_loop
        send, receive = socket_listener(address, socket)
        remote_loop(RemoteProtocol(send, receive, to_export))
    except Finished:

def socket_connecter(address, socket=socket):
    s = socket(AF_INET, SOCK_STREAM)
    print "Connecting %s" % (address,)
    return SenderWrapper(s).send, ReceiverWrapper(s).receive

def connect(address, socket=socket):
    from import RemoteView
    from distributed import RemoteProtocol
    return RemoteView(RemoteProtocol(*socket_connecter(address, socket)))

def spawn_remote_side(code, gw):
    """ A very simple wrapper around greenexecnet to allow
    spawning a remote side of lib/distributed
    from distributed import RemoteProtocol
    extra = str(py.code.Source("""
    from distributed import remote_loop, RemoteProtocol
    remote_loop(RemoteProtocol(channel.send, channel.receive, globals()))
    channel = gw.remote_exec(code + "\n" + extra)
    return RemoteProtocol(channel.send, channel.receive)