Commits

Stefan Scherfke committed 0b876d6

Forgot to add a renamed file.

Comments (0)

Files changed (2)

example_app/test/test_base.py

+from zmq.eventloop import ioloop
+import mock
+import pytest
+import zmq
+
+import base
+
+
+class TestZmqProcess(object):
+    """Tests for :class:`base.ZmqProcess`."""
+
+    def test_setup(self):
+        zp = base.ZmqProcess()
+        zp.setup()
+
+        assert isinstance(zp.context, zmq.Context)
+        assert isinstance(zp.loop, ioloop.IOLoop)
+
+    @pytest.mark.parametrize('kwargs', [
+        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
+              callback=mock.Mock()),
+        dict(sock_type=23, addr='127.0.0.1', bind=True,
+              callback=mock.Mock()),
+        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
+              callback=mock.Mock(), subscribe=b'ohai'),
+    ])
+    def test_stream(self, kwargs):
+        zp = base.ZmqProcess()
+
+        # Patch the ZmqProcess instance
+        zp.context = mock.Mock(spec_set=zmq.Context)
+        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
+        sock_mock = zp.context.socket.return_value
+        sock_mock.bind_to_random_port.return_value = 42
+
+        # Patch ZMQStream and start testing
+        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
+            stream, port = zp.stream(**kwargs)
+
+            # Assert that the return values are correct
+            assert stream is zmqstream_mock.return_value
+            if isinstance(kwargs['addr'], tuple):
+                assert port == kwargs['addr'][1]
+            elif ':' in kwargs['addr']:
+                assert port == int(kwargs['addr'][-4:])
+            else:
+                assert port == sock_mock.bind_to_random_port.return_value
+
+            # Check that the socket was crated correctly
+            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
+            if kwargs['bind'] and ':' in kwargs['addr']:
+                assert sock_mock.bind.call_args == (
+                        ('tcp://%s' % kwargs['addr'],), {})
+            elif kwargs['bind']:
+                assert sock_mock.bind_to_random_port.call_args == (
+                        ('tcp://%s' % kwargs['addr'],), {})
+            else:
+                assert sock_mock.connect.call_args == (
+                        ('tcp://%s:%s' % kwargs['addr'],), {})
+
+            # Check creation of the stream
+            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
+            assert zmqstream_mock.return_value.on_recv.call_args == (
+                    (kwargs['callback'],), {})
+
+            # Check default subscribtion
+            if 'subscribe' in kwargs:
+                assert sock_mock.setsockopt.call_args == (
+                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})
+
+
+class TestMessageHandler(object):
+    """Tests for :class:`base.TestMessageHandler`."""
+
+    @pytest.mark.parametrize(('idx', 'msg'), [
+        (-1, [23, b'["test", null]']),
+        (1, [23, b'["test", "spam"]', 42]),
+        (TypeError, [23, 42]),
+        (ValueError, [23, b'["test"]23spam']),
+    ])
+    def test_call_json_load(self, idx, msg):
+        handler = mock.Mock()
+        logger = mock.Mock()
+        mh = base.MessageHandler(logger, 'test',
+                                 idx if isinstance(idx, int) else -1)
+        mh.test = handler
+
+        if isinstance(idx, int):
+            mh(msg)
+            assert handler.call_count == 1
+        else:
+            pytest.raises(idx, mh, msg)
+
+    @pytest.mark.parametrize(('ok', 'msg'), [
+        (True, [23, b'["test", "spam"]', 42]),
+        (AttributeError, [23, b'["_test", "spam"]', 42]),
+        (TypeError, [23, b'["spam", "spam"]', 42]),
+        (AttributeError, [23, b'["eggs", "spam"]', 42]),
+    ])
+    def test_call_get_handler(self, ok, msg):
+        handler = mock.Mock()
+        logger = mock.Mock()
+        mh = base.MessageHandler(logger, 'test', 1)
+        mh.test = handler
+        mh.spam = 'spam'
+
+        if ok is True:
+            mh(msg)
+            assert handler.call_args == (
+                    (msg[0], 'spam', msg[2]), {})
+        else:
+            pytest.raises(ok, mh, msg)

example_app/test/test_zmqproc.py

-from zmq.eventloop import ioloop
-import mock
-import pytest
-import zmq
-
-import zmqproc
-
-
-class TestZmqProcess(object):
-    """Tests for :class:`zmqproc.ZmqProcess`."""
-
-    def test_setup(self):
-        zp = zmqproc.ZmqProcess()
-        zp.setup()
-
-        assert isinstance(zp.context, zmq.Context)
-        assert isinstance(zp.loop, ioloop.IOLoop)
-
-    @pytest.mark.parametrize('kwargs', [
-        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
-              callback=mock.Mock()),
-        dict(sock_type=23, addr='127.0.0.1', bind=True,
-              callback=mock.Mock()),
-        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
-              callback=mock.Mock(), subscribe=b'ohai'),
-    ])
-    def test_stream(self, kwargs):
-        zp = zmqproc.ZmqProcess()
-
-        # Patch the ZmqProcess instance
-        zp.context = mock.Mock(spec_set=zmq.Context)
-        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
-        sock_mock = zp.context.socket.return_value
-        sock_mock.bind_to_random_port.return_value = 42
-
-        # Patch ZMQStream and start testing
-        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
-            stream, port = zp.stream(**kwargs)
-
-            # Assert that the return values are correct
-            assert stream is zmqstream_mock.return_value
-            if isinstance(kwargs['addr'], tuple):
-                assert port == kwargs['addr'][1]
-            elif ':' in kwargs['addr']:
-                assert port == int(kwargs['addr'][-4:])
-            else:
-                assert port == sock_mock.bind_to_random_port.return_value
-
-            # Check that the socket was crated correctly
-            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
-            if kwargs['bind'] and ':' in kwargs['addr']:
-                assert sock_mock.bind.call_args == (
-                        ('tcp://%s' % kwargs['addr'],), {})
-            elif kwargs['bind']:
-                assert sock_mock.bind_to_random_port.call_args == (
-                        ('tcp://%s' % kwargs['addr'],), {})
-            else:
-                assert sock_mock.connect.call_args == (
-                        ('tcp://%s:%s' % kwargs['addr'],), {})
-
-            # Check creation of the stream
-            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
-            assert zmqstream_mock.return_value.on_recv.call_args == (
-                    (kwargs['callback'],), {})
-
-            # Check default subscribtion
-            if 'subscribe' in kwargs:
-                assert sock_mock.setsockopt.call_args == (
-                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})