Commits

Anonymous committed 6ad8a29

Implemented remote wsgi actor calls

  • Participants
  • Parent commits 49f78b7

Comments (0)

Files changed (2)

               addr.call('test') could be written as addr.test()
               
         """
-        print "getattr addr:",self.actor_id,"method",method
         f = lambda message=None,timeout=None : self.call(method,message,timeout)
         return f
         
         conn.request('POST', parsed[2], json.dumps(message, default=handle_address))
         resp = conn.getresponse()
 
+    def call(self, method, message=None, timeout=None):
+        """Send a message to the remote Actor this object addresses.
+        Wait for a result. If a timeout in seconds is passed, raise
+        eventlet.TimeoutError if no result is returned in less than the timeout.
+        """
+        message_id = str(uuid.uuid1())
+        parsed,conn = connect(self._address)
+        call_msg = {'remotecall':message_id,
+                    'method':method,
+                    'message':message,
+                    'timeout':timeout}
+        resp = conn.request('POST',parsed[2],json.dumps(call_msg,default=handle_address))
+        if timeout is None:
+            cancel = None
+        else:
+            ## Raise any TimeoutError to the caller so they can handle it
+            cancel = eventlet.Timeout(timeout, eventlet.TimeoutError)
+
+        resp = conn.getresponse()
+        stat = resp.status
+        rstr = resp.read()
+
+        if stat == 202:
+            rjson = json.loads(rstr,object_hook=generate_address)
+            return rjson['message']
+        elif stat == 404:
+            rjson = json.loads(rstr,object_hook=generate_address)
+            raise RemoteAttributeError(rjson['invalid_method'])
+        elif stat == 406:
+            rjson = json.loads(rstr,object_hook=generate_address)
+            raise RemoteException(rjson['exception'])
+        elif stat == 408:
+            rjson = json.loads(rstr,object_hook=generate_address)
+            raise eventlet.TimeoutError(rjson['timeout'])
+        else:
+            raise RemoteException("Unknown remote response "+str(stat))
+
     def kill(self):
         parsed, conn = connect(self._address)
         conn.request('DELETE', parsed[2])
             "Need some sort of COMET protocol to implement this?")
 
 
-CALL_PATTERN = {'call': str, 'method': str, 'address': Address, 'message': object}
+CALL_PATTERN = {'call': str, 
+                'method': str, 
+                'address': Address, 
+                'message': object}
+REMOTE_CALL_PATTERN = {'remotecall':str,
+                       'method':str,
+                       'message':object,
+                       'timeout':object}
 RESPONSE_PATTERN = {'response': str, 'message': object}
 INVALID_METHOD_PATTERN = {'response': str, 'invalid_method': str}
 EXCEPTION_PATTERN = {'response': str, 'exception':object}
 
 import traceback
-from pyact import actor
+import eventlet
+from pyact import actor, shape
 
 
 def spawn_code(code_string):
             traceback.print_exc()
 
 
+RSP_PAT = {'response':str, 'message':object}
+EXC_PAT = {'response':str, 'exception':object}
+INV_PAT = {'response':str, 'invalid_method':str}
+
+class LocalCaller(actor.Actor):
+    """
+    Performs a local call on behalf of a remote caller and 
+    immediately exists
+    """
+    def main(self,local_addr,message_id,method,message,timeout):
+        my_address = eventlet.getcurrent().address
+        local_addr | {'call':message_id,
+                      'method':method,
+                      'address':my_address,
+                      'message':message}
+        if timeout is None:
+            cancel = None
+        else:
+            cancel = eventlet.Timeout(timeout,eventlet.TimeoutError)
+        RSP = {'response':message_id,'message':object}
+        EXC = {'response':message_id,'exception':object}
+        INV = {'response':message_id,'invalid_method':str}
+        _,res = self.receive(RSP,EXC,INV)
+        return res
+
 class ActorApplication(object):
+
     def __call__(self, env, start_response):
         path = env['PATH_INFO'][1:]
-        method = env['REQUEST_METHOD']
-        if method == 'PUT':
-            if not path:
-                start_response('405 Method Not Allowed', [('Content-type', 'text/plain')])
-                return 'Method Not Allowed\n'
-            new_actor = EvalActor.spawn(path, env['wsgi.input'].read(int(env['CONTENT_LENGTH'])))
+        method = 'do_'+env['REQUEST_METHOD']
+        if hasattr(self,method):
+            return getattr(self,method)(path,env,start_response)
+
+    def do_PUT(self,path,env,start_response):
+        if not path:
+            start_response('405 Method Not Allowed', [('Content-type', 'text/plain')])
+            return 'Method Not Allowed\n'
+        new_actor = EvalActor.spawn(path, env['wsgi.input'].read(int(env['CONTENT_LENGTH'])))
+        start_response('202 Accepted', [('Content-type', 'text/plain')])
+        return 'Accepted\n'
+
+    def do_POST(self,path,env,start_response):
+        local_address = 'http://%s/' % (env['HTTP_HOST'], )
+        old_actor = actor.Actor.all_actors.get(path)
+
+        if old_actor is None:
+            start_response('404 Not Found', [('Content-type', 'text/plain')])
+            return "Not Found\n"
+
+        try:
+            body = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
+            def generate_address(obj):
+                if obj.keys() == ['address']:
+                    address = obj['address']
+                    if address.startswith(local_address):
+                        new_obj = {'address': address[len(local_address):]}
+                        return actor.generate_address(new_obj)
+                return obj
+            msg = actor.json.loads(body, object_hook=generate_address)
+        except Exception, e:
+            traceback.print_exc()
+            start_response('406 Not Acceptable', [('Content-type', 'text/plain')])
+            return 'Not Acceptable\n'
+
+        if not shape.is_shaped(msg,actor.REMOTE_CALL_PATTERN):
+            old_actor.address.cast(msg)
             start_response('202 Accepted', [('Content-type', 'text/plain')])
             return 'Accepted\n'
-        elif method == 'POST':
-            old_actor = actor.Actor.all_actors.get(path)
-            if old_actor is None:
-                start_response('404 Not Found', [('Content-type', 'text/plain')])
-                return "Not Found\n"
-            try:
-                body = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
-                local_address = 'http://%s/' % (env['HTTP_HOST'], )
-                def generate_address(obj):
-                    if obj.keys() == ['address'] and obj['address'].startswith(local_address):
-                        return actor.generate_address({'address': obj['address'][len(local_address):]})
-                    return obj
-                msg = actor.json.loads(body, object_hook=generate_address)
-            except Exception, e:
-                traceback.print_exc()
-                start_response('406 Not Acceptable', [('Content-type', 'text/plain')])
-                return 'Not Acceptable\n'
-            old_actor.address.cast(msg)
-            start_response('202 Accepted', [('Content-type', 'text/plain')])
-            return 'Accepted\n'            
-        elif method == 'DELETE':
-            old_actor = actor.Actor.all_actors.get(path)
-            if old_actor is None:
-                start_response('404 Not Found', [('Content-type', 'text/plain')])
-                return "Not Found\n"
-            old_actor.address.kill()
+        
+        # handle a remote call
+        try:
+            rmsg = LocalCaller.spawn(local_addr=old_actor.address,
+                                     message_id=msg['remotecall'],
+                                     method=msg['method'],
+                                     message=msg['message'],
+                                     timeout=msg['timeout']).wait()
+        except eventlet.TimeoutError:
+            start_response('408 Request Timeout',[('Content-type','text/plain')])
+            return actor.json.dumps({'timeout':msg['timeout']})+'\n'
+
+        def handle_address(obj):
+            if isinstance(obj, actor.Address):
+                return {'address': local_address + obj.actor_id}
+            raise TypeError(obj)
+
+        resp_str = actor.json.dumps(rmsg, default=handle_address)+'\n'
+        if shape.is_shaped(rmsg, RSP_PAT):
+            start_response('202 Accepted',[('Content-type','application/json')])
+        elif shape.is_shaped(rmsg, INV_PAT):
+            start_response('404 Not Found',[('Content-type','application/json')])
+        else:
+            start_response('406 Not Acceptable',[('Content-type','application/json')])
+        return resp_str
+
+
+    def do_DELETE(self,path,env,start_response):
+        old_actor = actor.Actor.all_actors.get(path)
+        if old_actor is None:
+            start_response('404 Not Found', [('Content-type', 'text/plain')])
+            return "Not Found\n"
+        old_actor.address.kill()
+        start_response('200 OK', [('Content-type', 'text/plain')])
+        return '\n'
+
+    def do_HEAD(self,path,env,start_response):
+        old_actor = actor.Actor.all_actors.get(path)
+        if old_actor is None:
+            start_response('404 Not Found', [('Content-type', 'text/plain')])
+            return "\n"
+        start_response('200 OK', [('Content-type', 'application/json')])
+        return "\n"
+
+    def do_GET(self,path,env,start_response):
+
+        if not path:
             start_response('200 OK', [('Content-type', 'text/plain')])
-            return '\n'
-        elif method == 'HEAD':
-            old_actor = actor.Actor.all_actors.get(path)
-            if old_actor is None:
-                start_response('404 Not Found', [('Content-type', 'text/plain')])
-                return "\n"
-            start_response('200 OK', [('Content-type', 'application/json')])
-            return "\n"
-        elif method == 'GET':
-            if not path:
-                start_response('200 OK', [('Content-type', 'text/plain')])
-                return 'index\n'
-            elif path == 'some-js-file.js':
-                start_response('200 OK', [('Content-type', 'text/plain')])
-                return 'some-js-file\n'
-            old_actor = actor.Actor.all_actors.get(path)
-            if old_actor is None:
-                start_response('404 Not Found', [('Content-type', 'text/plain')])
-                return "Not Found\n"
-            start_response('200 OK', [('Content-type', 'application/json')])
-            local_address = 'http://%s/' % (env['HTTP_HOST'], )
-            def handle_address(obj):
-                if isinstance(obj, actor.Address):
-                    return {'address': local_address + obj.actor_id}
-                raise TypeError(obj)
-            to_dump = dict([(x, y) for (x, y) in vars(old_actor).items() if not x.startswith('_')])
-            return actor.json.dumps(to_dump, default=handle_address) + '\n'
+            return 'index\n'
+        elif path == 'some-js-file.js':
+            start_response('200 OK', [('Content-type', 'text/plain')])
+            return 'some-js-file\n'
+
+        old_actor = actor.Actor.all_actors.get(path)
+        if old_actor is None:
+            start_response('404 Not Found', [('Content-type', 'text/plain')])
+            return "Not Found\n"
+        start_response('200 OK', [('Content-type', 'application/json')])
+        local_address = 'http://%s/' % (env['HTTP_HOST'], )
+
+        def handle_address(obj):
+            if isinstance(obj, actor.Address):
+                return {'address': local_address + obj.actor_id}
+            raise TypeError(obj)
+
+        to_dump = dict([(x, y) for (x, y) in vars(old_actor).items() if not x.startswith('_')])
+        return actor.json.dumps(to_dump, default=handle_address) + '\n'
+        
 
 
 app = ActorApplication()