Emil Vladev  committed 26b783b

Move to PUSH/PULL pairs. Support disconecting of clients

  • Participants
  • Parent commits e27f869
  • Branches default
  • Tags v0.7

Comments (0)

Files changed (5)

 Implementing the SockJS protocol is not trivial, as it supports a handful of
 transports. However, good `ZeroMQ bindings`_ exist for pretty much every popular
+language. This simple proxy allows the use of Real-time web communication (via
+SockJS_) for nearly every popular language.
-The incomming messages socket is a ``PUB`` one (by default on port ``9241``), where
+The incomming messages socket is a ``PUSH`` one (by default on port ``9241``), where
 the proxy will push messages to. All messages are 3-part messages in the form of:
-The outgoing socket is a ``SUB`` one (by default on port ``9242``), where the
+The outgoing socket is a ``PULL`` one (by default on port ``9242``), where the
 proxy will read messages and forward them to the SockJS client (usually a browser).
 The format is like the one for incomming messages:
 | ``data``         |
-However, as of now the only ``message_type`` is:
+As of now the supported ``message_type`` are:
     Send this to forward a message to a client with ``session_id``.
     ``data`` contains the message (as bytes) for the client.
+    Disconnect the client with the given ``session_id``.
+    ``data`` is discarded but *must* be present in the message as
+    a part (usually empty).
-    usage: sockjsproxy [-h] [--address ADDRESS] [--in-port IN_PORT]
+    usage: sockjsproxy [-h] [--version] [--address ADDRESS] [--in-port IN_PORT]
                        [--out-port OUT_PORT] [--http-port HTTP_PORT]
                        [--static-path STATIC_PATH] [--static-url STATIC_URL]
                        [--samples SAMPLES] [--verbose]
     optional arguments:
       -h, --help            show this help message and exit
+      --version             show program's version number and exit
       --address ADDRESS, -a ADDRESS
                             The 0MQ address to listen to. Defaults to * (meaning -
                             The url where the files in --static-path will be
       --samples SAMPLES     Serve samples under SAMPLES path
-      --verbose             Make the server output more information - useful for
+      --verbose, -v         Make the server output more information - useful for
 Then visit ``http://localhost:8080/samples/``,
 play with the demo, see the log output and play with the code.
+This simple ```` script is a simple echo server,
+that will reply with the current time to every message and force
+the client to disconnect (by sending a ``disconnect`` message to
+the proxy server).
-Emil Ivanov
+- Emil Ivanov
+  - Switch from ``PUB/SUB`` socket pair to ``PUSH/PULL``.
+  - Add support to force the client to disconnect.
 .. _Python:
 from setuptools import setup, find_packages
+from sockjsproxy import VERSION
-      version='0.6.2',
+      version=VERSION,
       author='Emil Ivanov',

File sockjsproxy/

+VERSION = '0.7.0'

File sockjsproxy/samples/

 def main():
     ctx = zmq.Context()
-    in_socket = ctx.socket(zmq.SUB)
-    in_socket.setsockopt(zmq.SUBSCRIBE, '')
+    in_socket = ctx.socket(zmq.PULL)
-    out_socket = ctx.socket(zmq.PUB)
+    out_socket = ctx.socket(zmq.PUSH)
     print 'Listening for messages'
                                            'The time now is: ' + str(])
+                out_socket.send_multipart(['disconnect',
+                                           session_id,
+                                           ''])
     except KeyboardInterrupt:

File sockjsproxy/

+from __future__ import absolute_import
 import logging
 import argparse
 import time, signal
+import os.path as p
+from sockjsproxy import VERSION
 import zmq
 from zmq.eventloop import zmqstream
         if command == 'message':
             self.frontend.send(session_id, message)
         elif command == 'disconnect':
-            # TODO: Drop the connection
-            pass
+            self.frontend.close(session_id)
             log.warn("Invalid command %s", command)
+    def close(self, session_id):
+        connection = self.sessions.get(session_id)
+        if not connection:
+            log.warn('Could not close session %s: '
+                     'Connection not found.', session_id)
+            return
+        connection.close()
 class SockJSProxy(object):
     def collect_args(self):
         parser = argparse.ArgumentParser(description="Proxy messages between sock.js and a 0MQ socket")
+        parser.add_argument('--version', action='version', version="%(prog)s " + VERSION)
         parser.add_argument('--address', '-a', type=str,
                             help="The 0MQ address to listen to. Defaults to * (meaning - everything)",
         parser.add_argument('--samples', type=str,
                             help='Serve samples under SAMPLES path')
-        parser.add_argument('--verbose', action='store_true', default=False,
+        parser.add_argument('--verbose', '-v', action='store_true', default=False,
                             help='Make the server output more information - useful for debugging')
         args = parser.parse_args()
         if bool(args.static_path) != bool(args.static_url):
             parser.error("You should specify both --static-path and --static-url to have static files served.")
+        if args.static_path and (not p.exists(args.static_path) or
+                                 not p.isdir(args.static_path)):
+            parser.error("Static directory %s does not "
+                         "exists or not directory" % args.static_path)
         return args
     def init_logging(self, verbose):"Pulling outgoing messages from: %s", out_address)
         ctx = zmq.Context()
-        in_socket = ctx.socket(zmq.PUB)
+        in_socket = ctx.socket(zmq.PUSH)
-        out_socket = ctx.socket(zmq.SUB)
+        out_socket = ctx.socket(zmq.PULL)
-        out_socket.setsockopt(zmq.SUBSCRIBE, '')
         io_loop = ioloop.IOLoop.instance() # ZMQ loop
         if args.samples:
             sample_url = '/' + args.samples.strip('/')
-            import os.path as p
             sample_path = p.join(p.abspath(p.dirname(__file__)), 'samples')
   "Serving samples content at %s", sample_url)
             routes += [(r"%s/?(.*)" % sample_url, web.StaticFileHandler,