Commits

Anonymous committed 26b783b

Move to PUSH/PULL pairs. Support disconecting of clients

  • Participants
  • Parent commits e27f869
  • Tags v0.7

Comments (0)

Files changed (5)

 
 Implementing the SockJS protocol is not trivial, as it supports a handful of
 transports. However, good `ZeroMQ bindings`_ exist for pretty much every popular
-language.
+language. This simple proxy allows the use of Real-time web communication (via
+SockJS_) for nearly every popular language.
 
 Implementation
 --------------
 
 ----------
 
-The incomming messages socket is a ``PUB`` one (by default on port ``9241``), where
+The incomming messages socket is a ``PUSH`` one (by default on port ``9241``), where
 the proxy will push messages to. All messages are 3-part messages in the form of:
 
 +------------------+
 
 ----------
 
-The outgoing socket is a ``SUB`` one (by default on port ``9242``), where the
+The outgoing socket is a ``PULL`` one (by default on port ``9242``), where the
 proxy will read messages and forward them to the SockJS client (usually a browser).
 
 The format is like the one for incomming messages:
 | ``data``         |
 +------------------+
 
-However, as of now the only ``message_type`` is:
+As of now the supported ``message_type`` are:
 
 ``message``
     Send this to forward a message to a client with ``session_id``.
     ``data`` contains the message (as bytes) for the client.
+``disconnect``
+    Disconnect the client with the given ``session_id``.
+    ``data`` is discarded but *must* be present in the message as
+    a part (usually empty).
 
 
 Installation
 
 ::
 
-    usage: sockjsproxy [-h] [--address ADDRESS] [--in-port IN_PORT]
+    usage: sockjsproxy [-h] [--version] [--address ADDRESS] [--in-port IN_PORT]
                        [--out-port OUT_PORT] [--http-port HTTP_PORT]
                        [--static-path STATIC_PATH] [--static-url STATIC_URL]
                        [--samples SAMPLES] [--verbose]
 
     optional arguments:
       -h, --help            show this help message and exit
+      --version             show program's version number and exit
       --address ADDRESS, -a ADDRESS
                             The 0MQ address to listen to. Defaults to * (meaning -
                             everything)
                             The url where the files in --static-path will be
                             served
       --samples SAMPLES     Serve samples under SAMPLES path
-      --verbose             Make the server output more information - useful for
+      --verbose, -v         Make the server output more information - useful for
                             debugging
 
+
 Samples
 -------
 
 
 Then visit ``http://localhost:8080/samples/``,
 play with the demo, see the log output and play with the code.
+This simple ``datereply-sjp.py`` script is a simple echo server,
+that will reply with the current time to every message and force
+the client to disconnect (by sending a ``disconnect`` message to
+the proxy server).
 
 Licence
 -------
 
 Authors
 -------
-Emil Ivanov
+- Emil Ivanov
 
+Changelog
+---------
+0.7
+  - Switch from ``PUB/SUB`` socket pair to ``PUSH/PULL``.
+  - Add support to force the client to disconnect.
 
 
 .. _Python: http://python.org/
 
 from setuptools import setup, find_packages
 
+from sockjsproxy import VERSION
+
 setup(name='sockjsproxy',
-      version='0.6.2',
+      version=VERSION,
       author='Emil Ivanov',
       author_email='emil.vladev@gmail.com',
       url='https://bitbucket.org/vladev/sockjsproxy',

File sockjsproxy/__init__.py

+VERSION = '0.7.0'

File sockjsproxy/samples/datereply-sjp.py

 
 def main():
     ctx = zmq.Context()
-    in_socket = ctx.socket(zmq.SUB)
-    in_socket.setsockopt(zmq.SUBSCRIBE, '')
+    in_socket = ctx.socket(zmq.PULL)
     in_socket.connect('tcp://localhost:9241')
-    out_socket = ctx.socket(zmq.PUB)
+    out_socket = ctx.socket(zmq.PUSH)
     out_socket.connect('tcp://localhost:9242')
 
     print 'Listening for messages'
                 out_socket.send_multipart(['message',
                                            session_id,
                                            'The time now is: ' + str(datetime.datetime.now())])
+                out_socket.send_multipart(['disconnect',
+                                           session_id,
+                                           ''])
     except KeyboardInterrupt:
         pass
 

File sockjsproxy/sockjsproxy.py

+from __future__ import absolute_import
+
 import logging
 import argparse
 import time, signal
+import os.path as p
+
+from sockjsproxy import VERSION
 
 import zmq
 from zmq.eventloop import zmqstream
         if command == 'message':
             self.frontend.send(session_id, message)
         elif command == 'disconnect':
-            # TODO: Drop the connection
-            pass
+            self.frontend.close(session_id)
         else:
             log.warn("Invalid command %s", command)
 
 
         connection.send(message)
 
+    def close(self, session_id):
+        connection = self.sessions.get(session_id)
+        if not connection:
+            log.warn('Could not close session %s: '
+                     'Connection not found.', session_id)
+            return
+
+        connection.close()
+
 
 class SockJSProxy(object):
     def collect_args(self):
         parser = argparse.ArgumentParser(description="Proxy messages between sock.js and a 0MQ socket")
+        parser.add_argument('--version', action='version', version="%(prog)s " + VERSION)
         parser.add_argument('--address', '-a', type=str,
                             help="The 0MQ address to listen to. Defaults to * (meaning - everything)",
                             default='*')
 
         parser.add_argument('--samples', type=str,
                             help='Serve samples under SAMPLES path')
-        parser.add_argument('--verbose', action='store_true', default=False,
+        parser.add_argument('--verbose', '-v', action='store_true', default=False,
                             help='Make the server output more information - useful for debugging')
 
         args = parser.parse_args()
         if bool(args.static_path) != bool(args.static_url):
             parser.error("You should specify both --static-path and --static-url to have static files served.")
 
+        if args.static_path and (not p.exists(args.static_path) or
+                                 not p.isdir(args.static_path)):
+            parser.error("Static directory %s does not "
+                         "exists or not directory" % args.static_path)
+
         return args
 
     def init_logging(self, verbose):
         log.info("Pulling outgoing messages from: %s", out_address)
 
         ctx = zmq.Context()
-        in_socket = ctx.socket(zmq.PUB)
+        in_socket = ctx.socket(zmq.PUSH)
         in_socket.bind(in_address)
-        out_socket = ctx.socket(zmq.SUB)
+        out_socket = ctx.socket(zmq.PULL)
         out_socket.bind(out_address)
-        out_socket.setsockopt(zmq.SUBSCRIBE, '')
 
         io_loop = ioloop.IOLoop.instance() # ZMQ loop
 
 
         if args.samples:
             sample_url = '/' + args.samples.strip('/')
-            import os.path as p
             sample_path = p.join(p.abspath(p.dirname(__file__)), 'samples')
             log.info("Serving samples content at %s", sample_url)
             routes += [(r"%s/?(.*)" % sample_url, web.StaticFileHandler,