pyzmq-article / test / test_zmqproc.py

from zmq.eventloop import ioloop
import mock
import pytest
import zmq

import zmqproc


class TestZmqProcess(object):
    """Tests for :class:`zmqproc.ZmqProcess`."""

    def test_setup(self):
        zp = zmqproc.ZmqProcess()
        zp.setup()

        assert isinstance(zp.context, zmq.Context)
        assert isinstance(zp.loop, ioloop.IOLoop)

    @pytest.mark.parametrize('kwargs', [
        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
              callback=mock.Mock()),
        dict(sock_type=23, addr='127.0.0.1', bind=True,
              callback=mock.Mock()),
        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
              callback=mock.Mock(), subscribe=b'ohai'),
    ])
    def test_stream(self, kwargs):
        zp = zmqproc.ZmqProcess()

        # Patch the ZmqProcess instance
        zp.context = mock.Mock(spec_set=zmq.Context)
        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
        sock_mock = zp.context.socket.return_value
        sock_mock.bind_to_random_port.return_value = 42

        # Patch ZMQStream and start testing
        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
            stream, port = zp.stream(**kwargs)

            # Assert that the return values are correct
            assert stream is zmqstream_mock.return_value
            if isinstance(kwargs['addr'], tuple):
                assert port == kwargs['addr'][1]
            elif ':' in kwargs['addr']:
                assert port == int(kwargs['addr'][-4:])
            else:
                assert port == sock_mock.bind_to_random_port.return_value

            # Check that the socket was crated correctly
            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
            if kwargs['bind'] and ':' in kwargs['addr']:
                assert sock_mock.bind.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
            elif kwargs['bind']:
                assert sock_mock.bind_to_random_port.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
            else:
                assert sock_mock.connect.call_args == (
                        ('tcp://%s:%s' % kwargs['addr'],), {})

            # Check creation of the stream
            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
            assert zmqstream_mock.return_value.on_recv.call_args == (
                    (kwargs['callback'],), {})

            # Check default subscribtion
            if 'subscribe' in kwargs:
                assert sock_mock.setsockopt.call_args == (
                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.