Commits

Jeffrey Gelens committed c802063

Wamp Pubsub

Comments (0)

Files changed (7)

examples/plot_graph.py

 
 
 if __name__ == "__main__":
-    resource = Resource(apps={
+    resource = Resource({
         '/': static_wsgi_app,
         '/data': PlotApplication
     })

examples/wamp_example.py

     def on_open(self):
         self.wamp.register_procedure("http://localhost:8000/calc#add", self.add)
         self.wamp.register_object("http://localhost:8000/test#", RPCTestClass)
+        self.wamp.register_pubsub("http://localhost:8000/somechannel")
+
         self.wamp.send_welcome()
         print "opened"
 
 
 
 if __name__ == "__main__":
-    resource = Resource(apps={
+    resource = Resource({
         '/': WampApplication
     })
 

geventwebsocket/handler.py

 from .logging import create_logger
 
 
+class Client(object):
+    def __init__(self, address, ws):
+        self.address = address
+        self.ws = ws
+
+
 class WebSocketHandler(WSGIHandler):
     """
     Automatically upgrades the connection to a websocket.
         # Since we're now a websocket connection, we don't care what the
         # application actually responds with for the http response
         try:
+            self.server.clients[self.client_address] = Client(self.client_address, self.websocket)
             self.application(self.environ, lambda s, h: [])
         finally:
+            del self.server.clients[self.client_address]
             self.websocket.close()
 
     def run_application(self):
 
         return self.server.logger
 
+    @property
+    def active_client(self):
+        return self.server.clients[self.client_address]
     def start_response(self, status, headers, exc_info=None):
         """
         Called when the handler is ready to send a response back to the remote

geventwebsocket/protocols/base.py

         else:
             raise Exception("No application coupled")
 
+    @property
+    def server(self):
+        if not hasattr(self.app, 'ws'):
+            return None
+
+        return self.app.ws.handler.server
+
+    @property
+    def handler(self):
+        if not hasattr(self.app, 'ws'):
+            return None
+
+        return self.app.ws.handler

geventwebsocket/protocols/wamp.py

 import inspect
+import json
+import random
+import string
 import types
-import json
 
 from .base import BaseProtocol
 
 
 def export_rpc(arg=None):
-   if type(arg) is types.FunctionType:
-      arg._rpc = arg.__name__
-      return arg
+    if isinstance(arg, types.FunctionType):
+        arg._rpc = arg.__name__
+    return arg
+
+
+def serialize(data):
+    return json.dumps(data)
 
 
 class Prefixes(object):
     def register_object(self, uri, obj):
         for k in inspect.getmembers(obj, inspect.ismethod):
             if '_rpc' in k[1].__dict__:
-               proc_uri = uri + k[1]._rpc
-               self.calls[proc_uri] = (obj, k[1])
+                proc_uri = uri + k[1]._rpc
+                self.calls[proc_uri] = (obj, k[1])
 
     def call(self, uri, args):
         if uri in self.calls:
             raise Exception("no such uri '{}'".format(uri))
 
 
+class Channels(object):
+    def __init__(self):
+        self.channels = {}
+
+    def create(self, uri, prefix_matching=False):
+        if uri not in self.channels:
+            self.channels[uri] = []
+
+        # TODO: implement prefix matching
+
+    def subscribe(self, uri, client):
+        print "subcsription to ", uri
+        print "of client ", client
+        if uri in self.channels:
+            self.channels[uri].append(client)
+
+    def unsubscribe(self, uri, client):
+        if uri not in self.channels:
+            return
+
+        client_index = self.channels[uri].index(client)
+        self.channels[uri].pop(client_index)
+
+        if len(self.channels[uri]) == 0:
+            del self.channels[uri]
+
+    def publish(self, uri, event, exclude=None, eligible=None):
+        if uri not in self.channels:
+            return
+
+        # TODO: exclude & eligible
+
+        msg = [WampProtocol.MSG_EVENT, uri, event]
+        print msg
+
+        for client in self.channels[uri]:
+            client.ws.send(serialize(msg))
+
+
 class WampProtocol(BaseProtocol):
-    MSG_WELCOME = 0;
-    MSG_PREFIX = 1;
-    MSG_CALL = 2;
-    MSG_CALL_RESULT = 3;
-    MSG_CALL_ERROR = 4;
-    MSG_SUBSCRIBE = 5;
-    MSG_UNSUBSCRIBE = 6;
-    MSG_PUBLISH = 7;
-    MSG_EVENT = 8;
+    MSG_WELCOME = 0
+    MSG_PREFIX = 1
+    MSG_CALL = 2
+    MSG_CALL_RESULT = 3
+    MSG_CALL_ERROR = 4
+    MSG_SUBSCRIBE = 5
+    MSG_UNSUBSCRIBE = 6
+    MSG_PUBLISH = 7
+    MSG_EVENT = 8
 
     PROTOCOL_NAME = "wamp"
 
     def __init__(self, *args, **kwargs):
         self.procedures = RemoteProcedures()
         self.prefixes = Prefixes()
-        self.session_id = "3434324"  # TODO generate
+        self.session_id = ''.join(
+            [random.choice(string.digits + string.letters)
+                for i in xrange(16)])
 
         super(WampProtocol, self).__init__(*args, **kwargs)
 
-    def _serialize(self, data):
-        return json.dumps(data)
-
     def register_procedure(self, *args, **kwargs):
         self.procedures.register_procedure(*args, **kwargs)
 
     def register_object(self, *args, **kwargs):
         self.procedures.register_object(*args, **kwargs)
 
+    def register_pubsub(self, *args, **kwargs):
+        if not hasattr(self.server, 'channels'):
+            self.server.channels = Channels()
+
+        self.server.channels.create(*args, **kwargs)
+
     def send_welcome(self):
         from geventwebsocket import get_version
 
             1,
             'gevent-websocket/' + get_version()
         ]
-        self.app.ws.send(self._serialize(welcome))
+        self.app.ws.send(serialize(welcome))
+
+    def rpc_call(self, data):
+        call_id, curie_or_uri = data[1:3]
+        args = data[3:]
+
+        if not isinstance(call_id, (str, unicode)):
+            raise Exception()
+        if not isinstance(curie_or_uri, (str, unicode)):
+            raise Exception()
+
+        uri = self.prefixes.resolve(curie_or_uri)
+
+        try:
+            result = self.procedures.call(uri, args)
+            result_msg = [self.MSG_CALL_RESULT, call_id, result]
+        except Exception, e:
+            result_msg = [self.MSG_CALL_ERROR,
+                          call_id, 'http://TODO#generic',
+                          str(type(e)), str(e)]
+
+        self.app.on_message(serialize(result_msg))
+
+    def pubsub_action(self, data):
+        action = data[0]
+        curie_or_uri = data[1]
+
+        if not isinstance(action, int):
+            raise Exception()
+        if not isinstance(curie_or_uri, (str, unicode)):
+            raise Exception()
+
+        uri = self.prefixes.resolve(curie_or_uri)
+
+        if action == self.MSG_SUBSCRIBE and len(data) == 2:
+            # resolve prefixe
+            self.server.channels.subscribe(data[1], self.handler.active_client)
+
+        elif action == self.MSG_UNSUBSCRIBE and len(data) == 2:
+            # resolve prefixes
+            self.server.channels.unsubscribe(
+                data[1], self.handler.active_client)
+
+        elif action == self.MSG_PUBLISH and len(data) >= 3:
+            payload = data[2] if len(data) >= 3 else None
+            exclude = data[3] if len(data) >= 4 else None
+            eligible = data[4] if len(data) >= 5 else None
+
+            print "data", data
+            print "payload", payload
+            print "exclude", exclude
+            print "eligible", eligible
+            self.server.channels.publish(uri, payload, exclude, eligible)
 
     def on_open(self):
         self.app.on_open()
             prefix, uri = data[1:3]
             self.prefixes.add(prefix, uri)
 
-        if data[0] == self.MSG_CALL and len(data) >= 3:
-            call_id, curie_or_uri = data[1:3]
-            args = data[3:]
+        elif data[0] == self.MSG_CALL and len(data) >= 3:
+            return self.rpc_call(data)
 
-            if not isinstance(call_id, (str, unicode)):
-                raise Exception()
-            if not isinstance(curie_or_uri, (str, unicode)):
-                raise Exception()
+        elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE,
+                         self.MSG_PUBLISH):
+            return self.pubsub_action(data)
+        else:
+            raise Exception("Unknown call")
 
-            uri = self.prefixes.resolve(curie_or_uri)
-
-            try:
-                result = self.procedures.call(uri, args)
-                result_msg = [self.MSG_CALL_RESULT, call_id, result]
-            except Exception, e:
-                result_msg = [self.MSG_CALL_ERROR,
-                              call_id, 'http://TODO#generic',
-                              str(type(e)), str(e)]
-
-            self.app.on_message(self._serialize(result_msg))
 
     def on_close(self):
         self.app.on_close()
-

geventwebsocket/resource.py

 
 
 class Resource(object):
-    def __init__(self, environ=None, apps=[]):
+    def __init__(self, apps=None, environ=None):
         self.environ = environ
         self.ws = None
-        self.apps = apps
+        self.apps = apps if apps else []
         self.current_app = None
 
     def app_protocol(self, path):
         else:
             raise Exception("No apps defined")
 
-    def call(self, environ, start_response):
+    def run_app(self, environ, start_response):
         if self.environ['PATH_INFO'] in self.apps:
             return self.apps[self.environ['PATH_INFO']](environ, start_response)
         else:
 
             return None
         else:
-            return self.call(environ, start_response)
+            return self.run_app(environ, start_response)

geventwebsocket/server.py

 from gevent.pywsgi import WSGIServer
 
-from .resource import Resource
 from .handler import WebSocketHandler
 from .logging import create_logger
 
         self.debug = kwargs.pop('debug', False)
         self.pre_start_hook = kwargs.pop('pre_start_hook', None)
         self._logger = None
+        self.clients = {}
 
         kwargs['handler_class'] = WebSocketHandler
         super(WebSocketServer, self).__init__(*args, **kwargs)
 
     def handle(self, socket, address):
+        print "Connected Clients: ", str(self.clients.keys())
+
         handler = self.handler_class(socket, address, self)
         handler.handle()