Stefan Scherfke avatar Stefan Scherfke committed c9eaeea

Moved the example app into a separate folder.

Comments (0)

Files changed (26)

example_app/pongproc.py

+from zmq.utils import jsonapi as json
+import zmq
+
+import zmqproc
+
+
+host = '127.0.0.1'
+port = 5678
+
+
+def ping():
+    """Sends ping requests and waits for replies."""
+    context = zmq.Context()
+    sock = context.socket(zmq.REQ)
+    sock.connect('tcp://%s:%s' % (host, port))
+
+    for i in range(5):
+        sock.send_json(['ping', i])
+        rep = sock.recv_json()
+        print('Ping got reply:', rep)
+
+    sock.send_json(['plzdiekthxbye', None])
+
+
+class PongProc(zmqproc.ZmqProcess):
+    """
+    Main processes for the Ponger. It handles ping requests and sends back
+    a pong.
+
+    """
+    def __init__(self, bind_addr):
+        super().__init__()
+
+        self.bind_addr = bind_addr
+        self.rep_stream = None
+
+        # Make sure this is pickle-able (e.g., not using threads)
+        # or it won't work on Windows. If it's not pickle-able, instantiate
+        # it in setup().
+        self.ping_handler = PingHandler()
+
+    def setup(self):
+        """Sets up PyZMQ and creates all streams."""
+        super().setup()
+
+        self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True,
+                callback=self.handle_rep_stream)
+
+    def run(self):
+        """Sets up everything and starts the event loop."""
+        self.setup()
+        self.loop.start()
+
+    def stop(self):
+        """Stops the event loop."""
+        self.loop.stop()
+
+    def handle_rep_stream(self, msg):
+        """
+        Handles messages from a Pinger:
+
+        *ping*
+            Send back a pong.
+
+        *plzdiekthxbye*
+            Stop the ioloop and exit.
+
+        """
+        msg_type, data = json.loads(msg[0])
+
+        if msg_type == 'ping':
+            rep = self.ping_handler.make_pong(data)
+            self.rep_stream.send_json(rep)
+
+        elif msg_type == 'plzdiekthxbye':
+            self.stop()
+
+        else:
+            raise RuntimeError('Received unkown message type: %s' % msg_type)
+
+
+class PingHandler(object):
+
+    def make_pong(self, num_pings):
+        """Creates and returns a pong message."""
+        print('Pong got request number %s' % num_pings)
+
+        return ['pong', num_pings]
+
+
+if __name__ == '__main__':
+    pong_proc = PongProc(bind_addr=(host, port))
+    pong_proc.start()
+
+    ping()
+
+    pong_proc.join()
Add a comment to this file

example_app/test/__init__.py

Empty file added.

example_app/test/data/pongproc.out

+Pong got request number 0
+Pong got request number 1
+Pong got request number 2
+Pong got request number 3
+Pong got request number 4
+Ping got reply: ['pong', 0]
+Ping got reply: ['pong', 1]
+Ping got reply: ['pong', 2]
+Ping got reply: ['pong', 3]
+Ping got reply: ['pong', 4]
Add a comment to this file

example_app/test/data/pongproc.txt

Empty file added.

Add a comment to this file

example_app/test/process/__init__.py

Empty file added.

example_app/test/process/conftest.py

+from inspect import isfunction, isgeneratorfunction
+
+
+def pytest_pycollect_makeitem(collector, name, obj):
+    """
+    Collects all instance methods that are generators and returns them as
+    normal function items.
+
+    """
+    if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
+        if isfunction(obj) or isgeneratorfunction(obj):
+            return collector._genfunctions(name, obj)
+
+
+def pytest_runtest_call(item):
+    """
+    Passes the test generator (``item.obj``) to the ``run()`` method of the
+    generator's instance. This method should be inherited from
+    :class:`test.support.ProcessTest`.
+
+    """
+    if isgeneratorfunction(item.obj):
+        item.obj.__self__.run(item.obj)
+    else:
+        item.runtest()

example_app/test/process/test_pongproc.py

+import pytest
+import zmq
+
+from test.support import ProcessTest, make_sock
+import pongproc
+
+
+pytestmark = pytest.mark.process
+
+host = '127.0.0.1'
+port = 5678
+
+
+class TestProngProc(ProcessTest):
+    """Communication test for the Platform Manager process."""
+
+    def setup_method(self, method):
+        """
+        Creates and starts a pp process and sets up sockets to communicate
+        with it.
+
+        """
+        self.context = zmq.Context()
+
+        # Mimics the ping process
+        self.req_sock = make_sock(self.context, zmq.REQ,
+                                  connect=(host, port))
+
+        self.pp = pongproc.PongProc((host, port))
+        self.pp.start()
+
+    def teardown_method(self, method):
+        """
+        Sends a kill message to the pp and waits for the process to terminate.
+
+        """
+        # Send a stop message to the prong process and wait until it joins
+        self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
+        self.pp.join()
+
+        # Assert that no more messages are in the pipe
+        pytest.raises(zmq.ZMQError, self.req_sock.recv_multipart)
+
+        self.req_sock.close()
+
+    def test_ping(self):
+        """Tests a ping-pong sequence."""
+        yield ('send', self.req_sock, [], ['ping', 1])
+
+        reply = yield ('recv', self.req_sock)
+        assert reply == [['pong', 1]]

example_app/test/support.py

+"""
+This module contains various functions and classes to support testing.
+
+"""
+import time
+import sys
+
+from zmq.utils import jsonapi as json
+import zmq
+
+
+def make_sock(context, sock_type, bind=None, connect=None):
+    """
+    Creates a *sock_type* typed socket and binds or connects it to the given
+    address.
+
+    """
+    sock = TestSocket(context, sock_type)
+    if bind:
+        sock.bind('tcp://%s:%s' % bind)
+    elif connect:
+        sock.connect('tcp://%s:%s' % connect)
+
+    return sock
+
+
+def get_forwarder(func):
+    """Returns a simple wrapper for *func*."""
+    def forwarder(*args, **kwargs):
+        return func(*args, **kwargs)
+
+    return forwarder
+
+
+def get_wrapped_fwd(func):
+    """
+    Returns a wrapper, that tries to call *func* multiple time in non-blocking
+    mode before rasing an :class:`zmq.ZMQError`.
+
+    """
+    def forwarder(*args, **kwargs):
+        for i in range(100):
+            try:
+                rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
+                return rep
+
+            except zmq.ZMQError:
+                time.sleep(0.01)
+
+        msg = 'Could not %s message.' % func.__name__[:4]
+        raise zmq.ZMQError(msg)
+
+    return forwarder
+
+
+class TestSocket(object):
+    """
+    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
+    will be called multiple times in non-blocking mode before a
+    :class:`zmq.ZMQError` is raised.
+
+    """
+    def __init__(self, context, sock_type):
+        self._context = context
+
+        sock = context.socket(sock_type)
+        self._sock = sock
+
+        forwards = [  # These methods can simply be forwarded
+            sock.bind,
+            sock.bind_to_random_port,
+            sock.connect,
+            sock.close,
+            sock.setsockopt,
+        ]
+        wrapped_fwd = [  # These methods are wrapped with a for loop
+            sock.recv,
+            sock.recv_json,
+            sock.recv_multipart,
+            sock.recv_unicode,
+            sock.send,
+            sock.send_json,
+            sock.send_multipart,
+            sock.send_unicode,
+        ]
+
+        for func in forwards:
+            setattr(self, func.__name__, get_forwarder(func))
+
+        for func in wrapped_fwd:
+            setattr(self, func.__name__, get_wrapped_fwd(func))
+
+
+class ProcessTest(object):
+    """
+    Base class for process tests. It offers basic actions for sending and
+    receiving messages and implements the *run* methods that handles the
+    actual test generators.
+
+    """
+    def send(self, socket, header, body, extra_data=[]):
+        """
+        JSON-encodes *body*, concatenates it with *header*, appends
+        *extra_data* and sends it as multipart message over *socket*.
+
+        *header* and *extra_data* should be lists containg byte objects or
+        objects implementing the buffer interface (like NumPy arrays).
+
+        """
+        socket.send_multipart(header + [json.dumps(body)] + extra_data)
+
+    def recv(self, socket, json_load_index=-1):
+        """
+        Receives and returns a multipart message from *socket* and tries to
+        JSON-decode the item at position *json_load_index* (defaults to ``-1``;
+        the last element in the list). The original byte string will be
+        replaced by the loaded object. Set *json_load_index* to ``None`` to get
+        the original, unchanged message.
+
+        """
+        msg = socket.recv_multipart()
+        if json_load_index is not None:
+            msg[json_load_index] = json.loads(msg[json_load_index])
+        return msg
+
+    def run(self, testfunc):
+        """
+        Iterates over the *testfunc* generator and executes all actions it
+        yields. Results will be sent back into the generator.
+
+        :param testfunc: A generator function that yields tuples containing
+                an action keyword, which should be a function of this or
+                the inheriting class (like ``send`` or ``recv``) and additional
+                parameters that will be passed to that function, e.g.:
+                ``('send', socket_obj, ['header'], 'body')``
+        :type testfunc:  generatorfunction
+
+        """
+        item_gen = testfunc()
+        item = next(item_gen)
+
+        def throw_err(skip_levels=0):
+            """
+            Throws the last error to *item_gen* and skips *skip_levels* in
+            the traceback to point to the line that yielded the last event.
+
+            """
+            etype, evalue, tb = sys.exc_info()
+            for i in range(skip_levels):
+                tb = tb.tb_next
+            item_gen.throw(etype, evalue, tb)
+
+        try:
+            while True:
+                try:
+                    ret = getattr(self, item[0])(*item[1:])
+                    item = item_gen.send(ret)
+
+                except zmq.ZMQError:
+                    throw_err(3)  # PyZMQ could not send/recv
+                except AssertionError:
+                    throw_err(1)  # Error in the test
+        except StopIteration:
+            pass
Add a comment to this file

example_app/test/system/__init__.py

Empty file added.

example_app/test/system/test_pongproc.py

+import os.path
+import subprocess
+
+import pytest
+
+
+pytestmark = pytest.mark.system
+
+
+def test_pongproc():
+    filename = os.path.join('test', 'data', 'pongproc.out')
+    expected = open(filename).read()
+
+    output = subprocess.check_output(['python', 'pongproc.py'],
+                                     universal_newlines=True)
+
+    assert output == expected

example_app/test/test_pongproc.py

+from zmq.utils import jsonapi as json
+import mock
+import pytest
+import zmq
+
+import pongproc
+
+
+host = '127.0.0.1'
+port = 5678
+
+
+def pytest_funcarg__pp(request):
+    """Creates a PongProc instance."""
+    return pongproc.PongProc((host, port))
+
+
+def pytest_funcarg__ph(request):
+    """Creates a PingHandler instance."""
+    return pongproc.PingHandler()
+
+
+class TestPongProc(object):
+    """Tests :class:`pongproc.PongProc`."""
+
+    def test_setup(self, pp):
+        pp.stream = mock.Mock(side_effect=lambda *a, **k: (a[0], mock.Mock()))
+
+        with mock.patch('zmqproc.ZmqProcess.setup') as setup_mock:
+            pp.setup()
+            assert setup_mock.call_count == 1
+
+        assert pp.stream.call_args_list == [
+            ((zmq.REP, (host, port)),
+                dict(bind=True, callback=pp.handle_rep_stream)),
+        ]
+        assert pp.rep_stream == zmq.REP
+
+    def test_run(self, pp):
+        pp.setup = mock.Mock()
+        pp.loop = mock.Mock()
+
+        pp.run()
+
+        assert pp.setup.call_count == 1
+        assert pp.loop.start.call_count == 1
+
+    def test_stop(self, pp):
+        pp.loop = mock.Mock()
+        pp.stop()
+        assert pp.loop.stop.call_count == 1
+
+    @pytest.mark.parametrize(('handler', 'msg'), [
+        ('handle_rep_stream', ['["spam", []]']),
+    ])
+    def test_handle_bad_msg(self, pp, handler, msg):
+        pytest.raises(RuntimeError, getattr(pp, handler), msg)
+
+    def test_stop_msg(self, pp):
+        pp.stop = mock.Mock()
+        pp.handle_rep_stream([b'["plzdiekthxbye", null]'])
+        assert pp.stop.call_count == 1
+
+    def test_ping(self, pp):
+        msg = ['ping', 1]
+        retval = 'spam'
+        pp.ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
+        pp.ping_handler.make_pong.return_value = retval
+        pp.rep_stream = mock.Mock()
+
+        pp.handle_rep_stream([json.dumps(msg)])
+
+        assert pp.ping_handler.make_pong.call_args == ((msg[1],), {})
+        assert pp.rep_stream.send_json.call_args == ((retval,), {})
+
+
+class TestPingHandler(object):
+
+    def test_make_pong(self, ph):
+        ping_num = 23
+
+        ret = ph.make_pong(ping_num)
+
+        assert ret == ['pong', ping_num]

example_app/test/test_zmqproc.py

+from zmq.eventloop import ioloop
+import mock
+import pytest
+import zmq
+
+import zmqproc
+
+
+class TestZmqProcess(object):
+    """Tests for :class:`zmqproc.ZmqProcess`."""
+
+    def test_setup(self):
+        zp = zmqproc.ZmqProcess()
+        zp.setup()
+
+        assert isinstance(zp.context, zmq.Context)
+        assert isinstance(zp.loop, ioloop.IOLoop)
+
+    @pytest.mark.parametrize('kwargs', [
+        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
+              callback=mock.Mock()),
+        dict(sock_type=23, addr='127.0.0.1', bind=True,
+              callback=mock.Mock()),
+        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
+              callback=mock.Mock(), subscribe=b'ohai'),
+    ])
+    def test_stream(self, kwargs):
+        zp = zmqproc.ZmqProcess()
+
+        # Patch the ZmqProcess instance
+        zp.context = mock.Mock(spec_set=zmq.Context)
+        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
+        sock_mock = zp.context.socket.return_value
+        sock_mock.bind_to_random_port.return_value = 42
+
+        # Patch ZMQStream and start testing
+        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
+            stream, port = zp.stream(**kwargs)
+
+            # Assert that the return values are correct
+            assert stream is zmqstream_mock.return_value
+            if isinstance(kwargs['addr'], tuple):
+                assert port == kwargs['addr'][1]
+            elif ':' in kwargs['addr']:
+                assert port == int(kwargs['addr'][-4:])
+            else:
+                assert port == sock_mock.bind_to_random_port.return_value
+
+            # Check that the socket was crated correctly
+            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
+            if kwargs['bind'] and ':' in kwargs['addr']:
+                assert sock_mock.bind.call_args == (
+                        ('tcp://%s' % kwargs['addr'],), {})
+            elif kwargs['bind']:
+                assert sock_mock.bind_to_random_port.call_args == (
+                        ('tcp://%s' % kwargs['addr'],), {})
+            else:
+                assert sock_mock.connect.call_args == (
+                        ('tcp://%s:%s' % kwargs['addr'],), {})
+
+            # Check creation of the stream
+            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
+            assert zmqstream_mock.return_value.on_recv.call_args == (
+                    (kwargs['callback'],), {})
+
+            # Check default subscribtion
+            if 'subscribe' in kwargs:
+                assert sock_mock.setsockopt.call_args == (
+                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})

example_app/zmqproc.py

+import multiprocessing
+
+from zmq.eventloop import ioloop, zmqstream
+import zmq
+
+
+class ZmqProcess(multiprocessing.Process):
+    """
+    This is the base for all processes and offers utility functions
+    for setup and creating new streams.
+
+    """
+    def __init__(self):
+        super().__init__()
+
+        self.context = None
+        """The ØMQ :class:`~zmq.Context` instance."""
+
+        self.loop = None
+        """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
+
+    def setup(self):
+        """
+        Creates a :attr:`context` and an event :attr:`loop` for the process.
+
+        """
+        self.context = zmq.Context()
+        self.loop = ioloop.IOLoop.instance()
+
+    def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
+        """
+        Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+
+        :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
+        :param addr: Address to bind or connect to formatted as *host:port*,
+                *(host, port)* or *host* (bind to random port).
+                If *bind* is ``True``, *host* may be:
+
+                - the wild-card ``*``, meaning all available interfaces,
+                - the primary IPv4 address assigned to the interface, in its
+                  numeric representation or
+                - the interface name as defined by the operating system.
+
+                If *bind* is ``False``, *host* may be:
+
+                - the DNS name of the peer or
+                - the IPv4 address of the peer, in its numeric representation.
+
+                If *addr* is just a host name without a port and *bind* is
+                ``True``, the socket will be bound to a random port.
+        :param bind: Binds to *addr* if ``True`` or tries to connect to it
+                otherwise.
+        :param callback: A callback for
+                :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+        :param subscribe: Subscription pattern for *SUB* sockets, optional,
+                defaults to ``b''``.
+        :returns: A tuple containg the stream and the port number.
+
+        """
+        sock = self.context.socket(sock_type)
+
+        # addr may be 'host:port' or ('host', port)
+        if isinstance(addr, str):
+            addr = addr.split(':')
+        host, port = addr if len(addr) == 2 else (addr[0], None)
+
+        # Bind/connect the socket
+        if bind:
+            if port:
+                sock.bind('tcp://%s:%s' % (host, port))
+            else:
+                port = sock.bind_to_random_port('tcp://%s' % host)
+        else:
+            sock.connect('tcp://%s:%s' % (host, port))
+
+        # Add a default subscription for SUB sockets
+        if sock_type == zmq.SUB:
+            sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+
+        # Create the stream and add the callback
+        stream = zmqstream.ZMQStream(sock, self.loop)
+        if callback:
+            stream.on_recv(callback)
+
+        return stream, int(port)

pongproc.py

-from zmq.utils import jsonapi as json
-import zmq
-
-import zmqproc
-
-
-host = '127.0.0.1'
-port = 5678
-
-
-def ping():
-    """Sends ping requests and waits for replies."""
-    context = zmq.Context()
-    sock = context.socket(zmq.REQ)
-    sock.connect('tcp://%s:%s' % (host, port))
-
-    for i in range(5):
-        sock.send_json(['ping', i])
-        rep = sock.recv_json()
-        print('Ping got reply:', rep)
-
-    sock.send_json(['plzdiekthxbye', None])
-
-
-class PongProc(zmqproc.ZmqProcess):
-    """
-    Main processes for the Ponger. It handles ping requests and sends back
-    a pong.
-
-    """
-    def __init__(self, bind_addr):
-        super().__init__()
-
-        self.bind_addr = bind_addr
-        self.rep_stream = None
-
-        # Make sure this is pickle-able (e.g., not using threads)
-        # or it won't work on Windows. If it's not pickle-able, instantiate
-        # it in setup().
-        self.ping_handler = PingHandler()
-
-    def setup(self):
-        """Sets up PyZMQ and creates all streams."""
-        super().setup()
-
-        self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True,
-                callback=self.handle_rep_stream)
-
-    def run(self):
-        """Sets up everything and starts the event loop."""
-        self.setup()
-        self.loop.start()
-
-    def stop(self):
-        """Stops the event loop."""
-        self.loop.stop()
-
-    def handle_rep_stream(self, msg):
-        """
-        Handles messages from a Pinger:
-
-        *ping*
-            Send back a pong.
-
-        *plzdiekthxbye*
-            Stop the ioloop and exit.
-
-        """
-        msg_type, data = json.loads(msg[0])
-
-        if msg_type == 'ping':
-            rep = self.ping_handler.make_pong(data)
-            self.rep_stream.send_json(rep)
-
-        elif msg_type == 'plzdiekthxbye':
-            self.stop()
-
-        else:
-            raise RuntimeError('Received unkown message type: %s' % msg_type)
-
-
-class PingHandler(object):
-
-    def make_pong(self, num_pings):
-        """Creates and returns a pong message."""
-        print('Pong got request number %s' % num_pings)
-
-        return ['pong', num_pings]
-
-
-if __name__ == '__main__':
-    pong_proc = PongProc(bind_addr=(host, port))
-    pong_proc.start()
-
-    ping()
-
-    pong_proc.join()
Add a comment to this file

test/__init__.py

Empty file removed.

test/data/pongproc.out

-Pong got request number 0
-Pong got request number 1
-Pong got request number 2
-Pong got request number 3
-Pong got request number 4
-Ping got reply: ['pong', 0]
-Ping got reply: ['pong', 1]
-Ping got reply: ['pong', 2]
-Ping got reply: ['pong', 3]
-Ping got reply: ['pong', 4]
Add a comment to this file

test/data/pongproc.txt

Empty file removed.

Add a comment to this file

test/process/__init__.py

Empty file removed.

test/process/conftest.py

-from inspect import isfunction, isgeneratorfunction
-
-
-def pytest_pycollect_makeitem(collector, name, obj):
-    """
-    Collects all instance methods that are generators and returns them as
-    normal function items.
-
-    """
-    if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
-        if isfunction(obj) or isgeneratorfunction(obj):
-            return collector._genfunctions(name, obj)
-
-
-def pytest_runtest_call(item):
-    """
-    Passes the test generator (``item.obj``) to the ``run()`` method of the
-    generator's instance. This method should be inherited from
-    :class:`test.support.ProcessTest`.
-
-    """
-    if isgeneratorfunction(item.obj):
-        item.obj.__self__.run(item.obj)
-    else:
-        item.runtest()

test/process/test_pongproc.py

-import pytest
-import zmq
-
-from test.support import ProcessTest, make_sock
-import pongproc
-
-
-pytestmark = pytest.mark.process
-
-host = '127.0.0.1'
-port = 5678
-
-
-class TestProngProc(ProcessTest):
-    """Communication test for the Platform Manager process."""
-
-    def setup_method(self, method):
-        """
-        Creates and starts a pp process and sets up sockets to communicate
-        with it.
-
-        """
-        self.context = zmq.Context()
-
-        # Mimics the ping process
-        self.req_sock = make_sock(self.context, zmq.REQ,
-                                  connect=(host, port))
-
-        self.pp = pongproc.PongProc((host, port))
-        self.pp.start()
-
-    def teardown_method(self, method):
-        """
-        Sends a kill message to the pp and waits for the process to terminate.
-
-        """
-        # Send a stop message to the prong process and wait until it joins
-        self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
-        self.pp.join()
-
-        # Assert that no more messages are in the pipe
-        pytest.raises(zmq.ZMQError, self.req_sock.recv_multipart)
-
-        self.req_sock.close()
-
-    def test_ping(self):
-        """Tests a ping-pong sequence."""
-        yield ('send', self.req_sock, [], ['ping', 1])
-
-        reply = yield ('recv', self.req_sock)
-        assert reply == [['pong', 1]]

test/support.py

-"""
-This module contains various functions and classes to support testing.
-
-"""
-import time
-import sys
-
-from zmq.utils import jsonapi as json
-import zmq
-
-
-def make_sock(context, sock_type, bind=None, connect=None):
-    """
-    Creates a *sock_type* typed socket and binds or connects it to the given
-    address.
-
-    """
-    sock = TestSocket(context, sock_type)
-    if bind:
-        sock.bind('tcp://%s:%s' % bind)
-    elif connect:
-        sock.connect('tcp://%s:%s' % connect)
-
-    return sock
-
-
-def get_forwarder(func):
-    """Returns a simple wrapper for *func*."""
-    def forwarder(*args, **kwargs):
-        return func(*args, **kwargs)
-
-    return forwarder
-
-
-def get_wrapped_fwd(func):
-    """
-    Returns a wrapper, that tries to call *func* multiple time in non-blocking
-    mode before rasing an :class:`zmq.ZMQError`.
-
-    """
-    def forwarder(*args, **kwargs):
-        for i in range(100):
-            try:
-                rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
-                return rep
-
-            except zmq.ZMQError:
-                time.sleep(0.01)
-
-        msg = 'Could not %s message.' % func.__name__[:4]
-        raise zmq.ZMQError(msg)
-
-    return forwarder
-
-
-class TestSocket(object):
-    """
-    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
-    will be called multiple times in non-blocking mode before a
-    :class:`zmq.ZMQError` is raised.
-
-    """
-    def __init__(self, context, sock_type):
-        self._context = context
-
-        sock = context.socket(sock_type)
-        self._sock = sock
-
-        forwards = [  # These methods can simply be forwarded
-            sock.bind,
-            sock.bind_to_random_port,
-            sock.connect,
-            sock.close,
-            sock.setsockopt,
-        ]
-        wrapped_fwd = [  # These methods are wrapped with a for loop
-            sock.recv,
-            sock.recv_json,
-            sock.recv_multipart,
-            sock.recv_unicode,
-            sock.send,
-            sock.send_json,
-            sock.send_multipart,
-            sock.send_unicode,
-        ]
-
-        for func in forwards:
-            setattr(self, func.__name__, get_forwarder(func))
-
-        for func in wrapped_fwd:
-            setattr(self, func.__name__, get_wrapped_fwd(func))
-
-
-class ProcessTest(object):
-    """
-    Base class for process tests. It offers basic actions for sending and
-    receiving messages and implements the *run* methods that handles the
-    actual test generators.
-
-    """
-    def send(self, socket, header, body, extra_data=[]):
-        """
-        JSON-encodes *body*, concatenates it with *header*, appends
-        *extra_data* and sends it as multipart message over *socket*.
-
-        *header* and *extra_data* should be lists containg byte objects or
-        objects implementing the buffer interface (like NumPy arrays).
-
-        """
-        socket.send_multipart(header + [json.dumps(body)] + extra_data)
-
-    def recv(self, socket, json_load_index=-1):
-        """
-        Receives and returns a multipart message from *socket* and tries to
-        JSON-decode the item at position *json_load_index* (defaults to ``-1``;
-        the last element in the list). The original byte string will be
-        replaced by the loaded object. Set *json_load_index* to ``None`` to get
-        the original, unchanged message.
-
-        """
-        msg = socket.recv_multipart()
-        if json_load_index is not None:
-            msg[json_load_index] = json.loads(msg[json_load_index])
-        return msg
-
-    def run(self, testfunc):
-        """
-        Iterates over the *testfunc* generator and executes all actions it
-        yields. Results will be sent back into the generator.
-
-        :param testfunc: A generator function that yields tuples containing
-                an action keyword, which should be a function of this or
-                the inheriting class (like ``send`` or ``recv``) and additional
-                parameters that will be passed to that function, e.g.:
-                ``('send', socket_obj, ['header'], 'body')``
-        :type testfunc:  generatorfunction
-
-        """
-        item_gen = testfunc()
-        item = next(item_gen)
-
-        def throw_err(skip_levels=0):
-            """
-            Throws the last error to *item_gen* and skips *skip_levels* in
-            the traceback to point to the line that yielded the last event.
-
-            """
-            etype, evalue, tb = sys.exc_info()
-            for i in range(skip_levels):
-                tb = tb.tb_next
-            item_gen.throw(etype, evalue, tb)
-
-        try:
-            while True:
-                try:
-                    ret = getattr(self, item[0])(*item[1:])
-                    item = item_gen.send(ret)
-
-                except zmq.ZMQError:
-                    throw_err(3)  # PyZMQ could not send/recv
-                except AssertionError:
-                    throw_err(1)  # Error in the test
-        except StopIteration:
-            pass
Add a comment to this file

test/system/__init__.py

Empty file removed.

test/system/test_pongproc.py

-import os.path
-import subprocess
-
-import pytest
-
-
-pytestmark = pytest.mark.system
-
-
-def test_pongproc():
-    filename = os.path.join('test', 'data', 'pongproc.out')
-    expected = open(filename).read()
-
-    output = subprocess.check_output(['python', 'pongproc.py'],
-                                     universal_newlines=True)
-
-    assert output == expected

test/test_pongproc.py

-from zmq.utils import jsonapi as json
-import mock
-import pytest
-import zmq
-
-import pongproc
-
-
-host = '127.0.0.1'
-port = 5678
-
-
-def pytest_funcarg__pp(request):
-    """Creates a PongProc instance."""
-    return pongproc.PongProc((host, port))
-
-
-def pytest_funcarg__ph(request):
-    """Creates a PingHandler instance."""
-    return pongproc.PingHandler()
-
-
-class TestPongProc(object):
-    """Tests :class:`pongproc.PongProc`."""
-
-    def test_setup(self, pp):
-        pp.stream = mock.Mock(side_effect=lambda *a, **k: (a[0], mock.Mock()))
-
-        with mock.patch('zmqproc.ZmqProcess.setup') as setup_mock:
-            pp.setup()
-            assert setup_mock.call_count == 1
-
-        assert pp.stream.call_args_list == [
-            ((zmq.REP, (host, port)),
-                dict(bind=True, callback=pp.handle_rep_stream)),
-        ]
-        assert pp.rep_stream == zmq.REP
-
-    def test_run(self, pp):
-        pp.setup = mock.Mock()
-        pp.loop = mock.Mock()
-
-        pp.run()
-
-        assert pp.setup.call_count == 1
-        assert pp.loop.start.call_count == 1
-
-    def test_stop(self, pp):
-        pp.loop = mock.Mock()
-        pp.stop()
-        assert pp.loop.stop.call_count == 1
-
-    @pytest.mark.parametrize(('handler', 'msg'), [
-        ('handle_rep_stream', ['["spam", []]']),
-    ])
-    def test_handle_bad_msg(self, pp, handler, msg):
-        pytest.raises(RuntimeError, getattr(pp, handler), msg)
-
-    def test_stop_msg(self, pp):
-        pp.stop = mock.Mock()
-        pp.handle_rep_stream([b'["plzdiekthxbye", null]'])
-        assert pp.stop.call_count == 1
-
-    def test_ping(self, pp):
-        msg = ['ping', 1]
-        retval = 'spam'
-        pp.ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
-        pp.ping_handler.make_pong.return_value = retval
-        pp.rep_stream = mock.Mock()
-
-        pp.handle_rep_stream([json.dumps(msg)])
-
-        assert pp.ping_handler.make_pong.call_args == ((msg[1],), {})
-        assert pp.rep_stream.send_json.call_args == ((retval,), {})
-
-
-class TestPingHandler(object):
-
-    def test_make_pong(self, ph):
-        ping_num = 23
-
-        ret = ph.make_pong(ping_num)
-
-        assert ret == ['pong', ping_num]

test/test_zmqproc.py

-from zmq.eventloop import ioloop
-import mock
-import pytest
-import zmq
-
-import zmqproc
-
-
-class TestZmqProcess(object):
-    """Tests for :class:`zmqproc.ZmqProcess`."""
-
-    def test_setup(self):
-        zp = zmqproc.ZmqProcess()
-        zp.setup()
-
-        assert isinstance(zp.context, zmq.Context)
-        assert isinstance(zp.loop, ioloop.IOLoop)
-
-    @pytest.mark.parametrize('kwargs', [
-        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
-              callback=mock.Mock()),
-        dict(sock_type=23, addr='127.0.0.1', bind=True,
-              callback=mock.Mock()),
-        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
-              callback=mock.Mock(), subscribe=b'ohai'),
-    ])
-    def test_stream(self, kwargs):
-        zp = zmqproc.ZmqProcess()
-
-        # Patch the ZmqProcess instance
-        zp.context = mock.Mock(spec_set=zmq.Context)
-        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
-        sock_mock = zp.context.socket.return_value
-        sock_mock.bind_to_random_port.return_value = 42
-
-        # Patch ZMQStream and start testing
-        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
-            stream, port = zp.stream(**kwargs)
-
-            # Assert that the return values are correct
-            assert stream is zmqstream_mock.return_value
-            if isinstance(kwargs['addr'], tuple):
-                assert port == kwargs['addr'][1]
-            elif ':' in kwargs['addr']:
-                assert port == int(kwargs['addr'][-4:])
-            else:
-                assert port == sock_mock.bind_to_random_port.return_value
-
-            # Check that the socket was crated correctly
-            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
-            if kwargs['bind'] and ':' in kwargs['addr']:
-                assert sock_mock.bind.call_args == (
-                        ('tcp://%s' % kwargs['addr'],), {})
-            elif kwargs['bind']:
-                assert sock_mock.bind_to_random_port.call_args == (
-                        ('tcp://%s' % kwargs['addr'],), {})
-            else:
-                assert sock_mock.connect.call_args == (
-                        ('tcp://%s:%s' % kwargs['addr'],), {})
-
-            # Check creation of the stream
-            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
-            assert zmqstream_mock.return_value.on_recv.call_args == (
-                    (kwargs['callback'],), {})
-
-            # Check default subscribtion
-            if 'subscribe' in kwargs:
-                assert sock_mock.setsockopt.call_args == (
-                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})

zmqproc.py

-import multiprocessing
-
-from zmq.eventloop import ioloop, zmqstream
-import zmq
-
-
-class ZmqProcess(multiprocessing.Process):
-    """
-    This is the base for all processes and offers utility functions
-    for setup and creating new streams.
-
-    """
-    def __init__(self):
-        super().__init__()
-
-        self.context = None
-        """The ØMQ :class:`~zmq.Context` instance."""
-
-        self.loop = None
-        """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
-
-    def setup(self):
-        """
-        Creates a :attr:`context` and an event :attr:`loop` for the process.
-
-        """
-        self.context = zmq.Context()
-        self.loop = ioloop.IOLoop.instance()
-
-    def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
-        """
-        Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
-
-        :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
-        :param addr: Address to bind or connect to formatted as *host:port*,
-                *(host, port)* or *host* (bind to random port).
-                If *bind* is ``True``, *host* may be:
-
-                - the wild-card ``*``, meaning all available interfaces,
-                - the primary IPv4 address assigned to the interface, in its
-                  numeric representation or
-                - the interface name as defined by the operating system.
-
-                If *bind* is ``False``, *host* may be:
-
-                - the DNS name of the peer or
-                - the IPv4 address of the peer, in its numeric representation.
-
-                If *addr* is just a host name without a port and *bind* is
-                ``True``, the socket will be bound to a random port.
-        :param bind: Binds to *addr* if ``True`` or tries to connect to it
-                otherwise.
-        :param callback: A callback for
-                :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
-        :param subscribe: Subscription pattern for *SUB* sockets, optional,
-                defaults to ``b''``.
-        :returns: A tuple containg the stream and the port number.
-
-        """
-        sock = self.context.socket(sock_type)
-
-        # addr may be 'host:port' or ('host', port)
-        if isinstance(addr, str):
-            addr = addr.split(':')
-        host, port = addr if len(addr) == 2 else (addr[0], None)
-
-        # Bind/connect the socket
-        if bind:
-            if port:
-                sock.bind('tcp://%s:%s' % (host, port))
-            else:
-                port = sock.bind_to_random_port('tcp://%s' % host)
-        else:
-            sock.connect('tcp://%s:%s' % (host, port))
-
-        # Add a default subscription for SUB sockets
-        if sock_type == zmq.SUB:
-            sock.setsockopt(zmq.SUBSCRIBE, subscribe)
-
-        # Create the stream and add the callback
-        stream = zmqstream.ZMQStream(sock, self.loop)
-        if callback:
-            stream.on_recv(callback)
-
-        return stream, int(port)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.