Commits

Stefan Scherfke committed a40e608

Completed integration of the MessageHandler class.

Comments (0)

Files changed (4)

example_app/base.py

     message type.
 
     """
-    def __init__(self, logger, stream_name, json_load=-1):
-        self._logger = logger
-        self._stream_name = stream_name
+    def __init__(self, json_load=-1):
         self._json_load = json_load
-        self._argspec_cache = {}
 
     def __call__(self, msg):
         """
         :meth:`zmq.core.socket.Socket.recv_multipart`.
 
         """
-        self._logger.debug('%s stream received: %s' % (self._stream_name, msg))
-
         # Try to JSON-decode the index "self._json_load" of the message
         i = self._json_load
         msg_type, data = json.loads(msg[i])

example_app/pongproc.py

-from zmq.utils import jsonapi as json
 import zmq
 
 import base
         # Make sure this is pickle-able (e.g., not using threads)
         # or it won't work on Windows. If it's not pickle-able, instantiate
         # it in setup().
-        self.ping_handler = PingHandler()
 
     def setup(self):
         """Sets up PyZMQ and creates all streams."""
         super().setup()
 
-        self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True,
-                callback=self.handle_rep_stream)
+        self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True)
+        self.rep_stream.on_recv(RepStreamHandler(self.rep_stream, self.stop))
 
     def run(self):
         """Sets up everything and starts the event loop."""
         """Stops the event loop."""
         self.loop.stop()
 
-    def handle_rep_stream(self, msg):
-        """
-        Handles messages from a Pinger:
 
-        *ping*
-            Send back a pong.
+class RepStreamHandler(base.MessageHandler):
+    """Handels messages arrvinge at the PongProc’s REP stream."""
+    def __init__(self, rep_stream, stop):
+        super().__init__()
+        self._rep_stream = rep_stream
+        self._stop = stop
+        self._ping_handler = PingHandler()
 
-        *plzdiekthxbye*
-            Stop the ioloop and exit.
+    def ping(self, data):
+        """Send back a pong."""
+        rep = self._ping_handler.make_pong(data)
+        self._rep_stream.send_json(rep)
 
-        """
-        msg_type, data = json.loads(msg[0])
-
-        if msg_type == 'ping':
-            rep = self.ping_handler.make_pong(data)
-            self.rep_stream.send_json(rep)
-
-        elif msg_type == 'plzdiekthxbye':
-            self.stop()
-
-        else:
-            raise RuntimeError('Received unkown message type: %s' % msg_type)
+    def plzdiekthxbye(self, data):
+        """Just calls :meth:`PongProc.stop`."""
+        self._stop()
 
 
 class PingHandler(object):

example_app/test/test_base.py

     ])
     def test_call_json_load(self, idx, msg):
         handler = mock.Mock()
-        logger = mock.Mock()
-        mh = base.MessageHandler(logger, 'test',
-                                 idx if isinstance(idx, int) else -1)
+        mh = base.MessageHandler(idx if isinstance(idx, int) else -1)
         mh.test = handler
 
         if isinstance(idx, int):
     ])
     def test_call_get_handler(self, ok, msg):
         handler = mock.Mock()
-        logger = mock.Mock()
-        mh = base.MessageHandler(logger, 'test', 1)
+        mh = base.MessageHandler(1)
         mh.test = handler
         mh.spam = 'spam'
 

example_app/test/test_pongproc.py

 from zmq.utils import jsonapi as json
 import mock
-import pytest
 import zmq
 
 import pongproc
     return pongproc.PongProc((host, port))
 
 
+def pytest_funcarg__rsh(request):
+    """Creates a RepStreamHandler instance."""
+    return pongproc.RepStreamHandler(
+            rep_stream=mock.Mock(),
+            stop=mock.Mock())
+
+
 def pytest_funcarg__ph(request):
     """Creates a PingHandler instance."""
     return pongproc.PingHandler()
     """Tests :class:`pongproc.PongProc`."""
 
     def test_setup(self, pp):
-        pp.stream = mock.Mock(side_effect=lambda *a, **k: (a[0], mock.Mock()))
+        def make_stream(*args, **kwargs):
+            stream = mock.Mock()
+            stream.type = args[0]
+            return stream, mock.Mock()
+        pp.stream = mock.Mock(side_effect=make_stream)
 
         with mock.patch('base.ZmqProcess.setup') as setup_mock:
             pp.setup()
             assert setup_mock.call_count == 1
 
         assert pp.stream.call_args_list == [
-            ((zmq.REP, (host, port)),
-                dict(bind=True, callback=pp.handle_rep_stream)),
+            ((zmq.REP, (host, port)), dict(bind=True)),
         ]
-        assert pp.rep_stream == zmq.REP
+        assert pp.rep_stream.type == zmq.REP
+        rsh = pp.rep_stream.on_recv.call_args[0][0]  # Get the msg handler
+        assert rsh._rep_stream == pp.rep_stream
+        assert rsh._stop == pp.stop
 
     def test_run(self, pp):
         pp.setup = mock.Mock()
         pp.stop()
         assert pp.loop.stop.call_count == 1
 
-    @pytest.mark.parametrize(('handler', 'msg'), [
-        ('handle_rep_stream', ['["spam", []]']),
-    ])
-    def test_handle_bad_msg(self, pp, handler, msg):
-        pytest.raises(RuntimeError, getattr(pp, handler), msg)
 
-    def test_stop_msg(self, pp):
-        pp.stop = mock.Mock()
-        pp.handle_rep_stream([b'["plzdiekthxbye", null]'])
-        assert pp.stop.call_count == 1
-
-    def test_ping(self, pp):
+class TestRepStreamHandler(object):
+    def test_ping(self, rsh):
         msg = ['ping', 1]
         retval = 'spam'
-        pp.ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
-        pp.ping_handler.make_pong.return_value = retval
-        pp.rep_stream = mock.Mock()
+        rsh._ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
+        rsh._ping_handler.make_pong.return_value = retval
 
-        pp.handle_rep_stream([json.dumps(msg)])
+        rsh([json.dumps(msg)])
 
-        assert pp.ping_handler.make_pong.call_args == ((msg[1],), {})
-        assert pp.rep_stream.send_json.call_args == ((retval,), {})
+        assert rsh._ping_handler.make_pong.call_args == ((msg[1],), {})
+        assert rsh._rep_stream.send_json.call_args == ((retval,), {})
+
+    def test_plzdiekthybye(self, rsh):
+        rsh([b'["plzdiekthxbye", null]'])
+        assert rsh._stop.call_count == 1
 
 
 class TestPingHandler(object):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.