Source

pypy / lib_pypy / distributed / protocol.py

The branch 'stm-gc' does not exist.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447

""" Distributed controller(s) for use with transparent proxy objects

First idea:

1. We use py.execnet to create a connection to wherever
2. We run some code there (RSync in advance makes some sense)
3. We access remote objects like normal ones, with a special protocol

Local side:
  - Request an object from remote side from global namespace as simple
    --- request(name) --->
  - Receive an object which is in protocol described below which is
    constructed as shallow copy of the remote type.

    Shallow copy is defined as follows:

    - for interp-level object that we know we can provide transparent proxy
      we just do that

    - for others we fake or fail depending on object

    - for user objects, we create a class which fakes all attributes of
      a class as transparent proxies of remote objects, we create an instance
      of that class and populate __dict__

    - for immutable types, we just copy that

Remote side:
  - we run code, whatever we like
  - additionally, we've got thread exporting stuff (or just exporting
    globals, whatever)
  - for every object, we just send an object, or provide a protocol for
    sending it in a different way.

"""

try:
    from __pypy__ import tproxy as proxy
    from __pypy__ import get_tproxy_controller
except ImportError:
    raise ImportError("Cannot work without transparent proxy functionality")

from distributed.objkeeper import ObjKeeper
from distributed import faker
import sys

class ObjectNotFound(Exception):
    pass

# XXX We do not make any garbage collection. We'll need it at some point

"""
TODO list:

1. Garbage collection - we would like probably to use weakrefs, but
   since they're not perfectly working in pypy, let's leave it alone for now
2. Some error handling - exceptions are working, there are still some
   applications where it all explodes.
3. Support inheritance and recursive types
"""

from __pypy__ import internal_repr

import types
from marshal import dumps
import exceptions

# just placeholders for letter_types value
class RemoteBase(object):
    pass

class DataDescriptor(object):
    pass

class NonDataDescriptor(object):
    pass
# end of placeholders

class AbstractProtocol(object):
    immutable_primitives = (str, int, float, long, unicode, bool, types.NotImplementedType)
    mutable_primitives = (list, dict, types.FunctionType, types.FrameType, types.TracebackType,
        types.CodeType)
    exc_dir = dict((val, name) for name, val in exceptions.__dict__.iteritems())
    
    letter_types = {
        'l' : list,
        'd' : dict,
        'c' : types.CodeType,
        't' : tuple,
        'e' : Exception,
        'ex': exceptions, # for instances
        'i' : int,
        'b' : bool,
        'f' : float,
        'u' : unicode,
        'l' : long,
        's' : str,
        'ni' : types.NotImplementedType,
        'n' : types.NoneType,
        'lst' : list,
        'fun' : types.FunctionType,
        'cus' : object,
        'meth' : types.MethodType,
        'type' : type,
        'tp' : None,
        'fr' : types.FrameType,
        'tb' : types.TracebackType,
        'reg' : RemoteBase,
        'get' : NonDataDescriptor,
        'set' : DataDescriptor,
    }
    type_letters = dict([(value, key) for key, value in letter_types.items()])
    assert len(type_letters) == len(letter_types)
    
    def __init__(self, exported_names={}):
        self.keeper = ObjKeeper(exported_names)
        #self.remote_objects = {} # a dictionary controller --> id
        #self.objs = [] # we just store everything, maybe later
        #   # we'll need some kind of garbage collection

    def wrap(self, obj):
        """ Wrap an object as sth prepared for sending
        """
        def is_element(x, iterable):
            try:
                return x in iterable
            except (TypeError, ValueError):
                return False
        
        tp = type(obj)
        ctrl = get_tproxy_controller(obj)
        if ctrl:
            return "tp", self.keeper.get_remote_object(ctrl)
        elif obj is None:
            return self.type_letters[tp]
        elif tp in self.immutable_primitives:
            # simple, immutable object, just copy
            return (self.type_letters[tp], obj)
        elif hasattr(obj, '__class__') and obj.__class__ in self.exc_dir:
            return (self.type_letters[Exception], (self.exc_dir[obj.__class__], \
                self.wrap(obj.args)))
        elif is_element(obj, self.exc_dir): # weird hashing problems
            return (self.type_letters[exceptions], self.exc_dir[obj])
        elif tp is tuple:
            # we just pack all of the items
            return ('t', tuple([self.wrap(elem) for elem in obj]))
        elif tp in self.mutable_primitives:
            id = self.keeper.register_object(obj)
            return (self.type_letters[tp], id)
        elif tp is type:
            try:
                return "reg", self.keeper.reverse_remote_types[obj]
            except KeyError:
                pass
            try:
                return self.type_letters[tp], self.type_letters[obj]
            except KeyError:
                id = self.register_type(obj)
                return (self.type_letters[tp], id)
        elif tp is types.MethodType:
            w_class = self.wrap(obj.im_class)
            w_func = self.wrap(obj.im_func)
            w_self = self.wrap(obj.im_self)
            return (self.type_letters[tp], (w_class, \
                self.wrap(obj.im_func.func_name), w_func, w_self))
        else:
            id = self.keeper.register_object(obj)
            w_tp = self.wrap(tp)
            return ("cus", (w_tp, id))
    
    def unwrap(self, data):
        """ Unwrap an object
        """
        if data == 'n':
            return None
        tp_letter, obj_data = data
        tp = self.letter_types[tp_letter]
        if tp is None:
            return self.keeper.get_object(obj_data)
        elif tp is RemoteBase:
            return self.keeper.exported_types_reverse[obj_data]
        elif tp in self.immutable_primitives:
            return obj_data # this is the object
        elif tp is tuple:
            return tuple([self.unwrap(i) for i in obj_data])
        elif tp in self.mutable_primitives:
            id = obj_data
            ro = RemoteBuiltinObject(self, id)
            self.keeper.register_remote_object(ro.perform, id)
            p = proxy(tp, ro.perform)
            ro.obj = p
            return p
        elif tp is Exception:
            cls_name, w_args = obj_data
            return getattr(exceptions, cls_name)(self.unwrap(w_args))
        elif tp is exceptions:
            cls_name = obj_data
            return getattr(exceptions, cls_name)
        elif tp is types.MethodType:
            w_class, w_name, w_func, w_self = obj_data
            tp = self.unwrap(w_class)
            name = self.unwrap(w_name)
            self_ = self.unwrap(w_self)
            if self_ is not None:
                if tp is None:
                    setattr(self_, name, classmethod(self.unwrap(w_func)))
                    return getattr(self_, name)
                return getattr(tp, name).__get__(self_, tp)
            func = self.unwrap(w_func)
            setattr(tp, name, func)
            return getattr(tp, name)
        elif tp is type:
            if isinstance(obj_data, str):
                return self.letter_types[obj_data]
            id = obj_data
            return self.get_type(obj_data)
        elif tp is DataDescriptor:            
            return faker.unwrap_getset_descriptor(self, obj_data)
        elif tp is NonDataDescriptor:
            return faker.unwrap_get_descriptor(self, obj_data)
        elif tp is object:
            # we need to create a proper type
            w_tp, id = obj_data
            real_tp = self.unwrap(w_tp)
            ro = RemoteObject(self, id)
            self.keeper.register_remote_object(ro.perform, id)
            p = proxy(real_tp, ro.perform)
            ro.obj = p
            return p
        else:
            raise NotImplementedError("Cannot unwrap %s" % (data,))
    
    def perform(self, *args, **kwargs):
        raise NotImplementedError("Abstract only protocol")
    
    # some simple wrappers
    def pack_args(self, args, kwargs):
        return self.pack_list(args), self.pack_dict(kwargs)
    
    def pack_list(self, lst):
        return [self.wrap(i) for i in lst]
    
    def pack_dict(self, d):
        return dict([(self.wrap(key), self.wrap(val)) for key, val in d.items()])
    
    def unpack_args(self, args, kwargs):
        return self.unpack_list(args), self.unpack_dict(kwargs)
    
    def unpack_list(self, lst):
        return [self.unwrap(i) for i in lst]
    
    def unpack_dict(self, d):
        return dict([(self.unwrap(key), self.unwrap(val)) for key, val in d.items()])
    
    def register_type(self, tp):
        return self.keeper.register_type(self, tp)
    
    def get_type(self, id):
        return self.keeper.get_type(id)
    
class LocalProtocol(AbstractProtocol):
    """ This is stupid protocol for testing purposes only
    """
    def __init__(self):
        super(LocalProtocol, self).__init__()
        self.types = []
   
    def perform(self, id, name, *args, **kwargs):
        obj = self.keeper.get_object(id)
        # we pack and than unpack, for tests
        args, kwargs = self.pack_args(args, kwargs)
        assert isinstance(name, str)
        dumps((args, kwargs))
        args, kwargs = self.unpack_args(args, kwargs)
        return getattr(obj, name)(*args, **kwargs)
    
    def register_type(self, tp):
        self.types.append(tp)
        return len(self.types) - 1
    
    def get_type(self, id):
        return self.types[id]

def remote_loop(protocol):
    # the simplest version possible, without any concurrency and such
    wrap = protocol.wrap
    unwrap = protocol.unwrap
    send = protocol.send
    receive = protocol.receive
    # we need this for wrap/unwrap
    while 1:
        command, data = receive()
        if command == 'get':
            try:
                item = protocol.keeper.exported_names[data]
            except KeyError:
                send(("finished_error",data))
            else:
                # XXX wrapping problems catching? do we have any?
                send(("finished", wrap(item)))
        elif command == 'call':
            id, name, args, kwargs = data
            args, kwargs = protocol.unpack_args(args, kwargs)
            try:
                retval = getattr(protocol.keeper.get_object(id), name)(*args, **kwargs)
            except:
                send(("raised", wrap(sys.exc_info())))
            else:
                send(("finished", wrap(retval)))
        elif command == 'finished':
            return unwrap(data)
        elif command == 'finished_error':
            raise ObjectNotFound("Cannot find name %s" % (data,))
        elif command == 'raised':
            exc, val, tb = unwrap(data)
            raise exc, val, tb
        elif command == 'type_reg':
            protocol.keeper.fake_remote_type(protocol, data)
        elif command == 'force':
            obj = protocol.keeper.get_object(data)
            w_obj = protocol.pack(obj)
            send(("forced", w_obj))
        elif command == 'forced':
            obj = protocol.unpack(data)
            return obj
        elif command == 'desc_get':
            name, w_obj, w_type = data
            obj = protocol.unwrap(w_obj)
            type_ = protocol.unwrap(w_type)
            if obj:
                type__ = type(obj)
            else:
                type__ = type_
            send(('finished', protocol.wrap(getattr(type__, name).__get__(obj, type_))))

        elif command == 'desc_set':
            name, w_obj, w_value = data
            obj = protocol.unwrap(w_obj)
            value = protocol.unwrap(w_value)
            getattr(type(obj), name).__set__(obj, value)
            send(('finished', protocol.wrap(None)))
        elif command == 'remote_keys':
            keys = protocol.keeper.exported_names.keys()
            send(('finished', protocol.wrap(keys)))
        else:
            raise NotImplementedError("command %s" % command)

class RemoteProtocol(AbstractProtocol):
    #def __init__(self, gateway, remote_code):
    #    self.gateway = gateway
    def __init__(self, send, receive, exported_names={}):
        super(RemoteProtocol, self).__init__(exported_names)
        #self.exported_names = exported_names
        self.send = send
        self.receive = receive
        #self.type_cache = {}
        #self.type_id = 0
        #self.remote_types = {}
    
    def perform(self, id, name, *args, **kwargs):
        args, kwargs = self.pack_args(args, kwargs)
        self.send(('call', (id, name, args, kwargs)))
        try:
            retval = remote_loop(self)
        except:
            e, val, tb = sys.exc_info()
            raise e, val, tb.tb_next.tb_next
        return retval
    
    def get_remote(self, name):
        self.send(("get", name))
        retval = remote_loop(self)
        return retval
    
    def force(self, id):
        self.send(("force", id))
        retval = remote_loop(self)
        return retval
    
    def pack(self, obj):
        if isinstance(obj, list):
            return "l", self.pack_list(obj)
        elif isinstance(obj, dict):
            return "d", self.pack_dict(obj)
        else:
            raise NotImplementedError("Cannot pack %s" % obj)
        
    def unpack(self, data):
        letter, w_obj = data
        if letter == 'l':
            return self.unpack_list(w_obj)
        elif letter == 'd':
            return self.unpack_dict(w_obj)
        else:
            raise NotImplementedError("Cannot unpack %s" % (data,))

    def get(self, name, obj, type):
        self.send(("desc_get", (name, self.wrap(obj), self.wrap(type))))
        return remote_loop(self)

    def set(self, obj, value):
        self.send(("desc_set", (name, self.wrap(obj), self.wrap(value))))

    def remote_keys(self):
        self.send(("remote_keys",None))
        return remote_loop(self)

class RemoteObject(object):
    def __init__(self, protocol, id):
        self.id = id
        self.protocol = protocol
    
    def perform(self, name, *args, **kwargs):
        return self.protocol.perform(self.id, name, *args, **kwargs)

class RemoteBuiltinObject(RemoteObject):
    def __init__(self, protocol, id):
        self.id = id
        self.protocol = protocol
        self.forced = False
    
    def perform(self, name, *args, **kwargs):
        # XXX: Check who really goes here
        if self.forced:
            return getattr(self.obj, name)(*args, **kwargs)
        if name in ('__eq__', '__ne__', '__lt__', '__gt__', '__ge__', '__le__',
            '__cmp__'):
            self.obj = self.protocol.force(self.id)
            return getattr(self.obj, name)(*args, **kwargs)
        return self.protocol.perform(self.id, name, *args, **kwargs)

def test_env(exported_names):
    from stackless import channel, tasklet, run
    inp, out = channel(), channel()
    remote_protocol = RemoteProtocol(inp.send, out.receive, exported_names)
    t = tasklet(remote_loop)(remote_protocol)
    
    #def send_trace(data):
    #    print "Sending %s" % (data,)
    #    out.send(data)

    #def receive_trace():
    #    data = inp.receive()
    #    print "Received %s" % (data,)
    #    return data
    return RemoteProtocol(out.send, inp.receive)