Source

pyzmq-article / article.txt

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
Designing and Testing PyZMQ Applications – Part 1
=================================================

`ZeroMQ <http://www.zeromq.org/>`_ (or ØMQ or ZMQ) is an intelligent messaging
framework and `described <http://zguide.zeromq.org/page:all#Fixing-the-World>`_
as “sockets on steroids”. That is, they look like normal TCP sockets but
actually work as you’d expect sockets to work. `PyZMQ
<http://www.zeromq.org/bindings:python>`_ adds even more convenience to them,
which makes it a really a good choice if you want to implement a distributed
application. Another big plus for ØMQ is that you can integrate sub-systems
written in C, Java or any other language ØMQ supports (which are a lot).

If you’ve never heard of ØMQ before, I recommend to read `ZeroMQ an
Introduction <http://nichol.as/zeromq-an-introduction>`_ by Nicholas Piël,
before you go on with this article.

The `ØMQ Guide <http://zguide.zeromq.org/page:all>`_ and `PyZMQ’s documentation
<http://zeromq.github.com/pyzmq/>`_ are really good, so you can easily get
started. However, when we began to implement a larger application with it (a
distributed simulation framework), several questions arose which were not
covered by the documentation:

- What’s the best way do design our application?
- How can we keep it readable, flexible and maintainable?
- How do we test it?

I didn’t find something like a best practice article that answered my
questions. So in this series of articles, I’m going to talk about what I’ve
learned during the last months. I’m not a PyZMQ expert (yet ;-)), but what I’ve
done so far works quite well and I never had more tests in a project than I do
have now.

You’ll find the source for the examples at `bitbucket
<https://bitbucket.org/ssc/pyzmq-article>`_. They are written in Python 3.2 and
tested under Mac OS X Lion, Ubuntu 11.10 and Windows 7, 64 bit in each case.
If you have any suggestions or improvements, please fork me or just leave a
`comment <#comments>`_.

In this first article, I’m going to talk a bit about how you could generally
design your application to be flexible, maintainable and testable. The second
part will be about unit testing and the finally, I’ll cover process and system
testing.


Comparison of Different Approaches
----------------------------------

There are basically three possible ways to implement a PyZMQ application. One,
that’s easy, but limited in practical use, one that’s more flexible, but not
really pythonic and one, that needs a bit more setup, but is flexible and
pythonic.

All three examples feature a simple ping process and a pong process with
varying complexity. I use multiprocessing to run the pong process, because
that’s what you should usually do in real PyZMQ applications (you don’t want to
use threads and if both processes are running on the same machine, there’s no
need to invoke both of them separately).

All of the examples will have the following output:

.. code-block::  bash

    (zmq)$ python blocking_recv.py
    Pong got request: ping 0
    Ping got reply: pong 0
    ...
    Pong got request: ping 4
    Ping got reply: pong 4

Let’s start with the easy one first. You just use on of the socket’s recv
methods in a loop:

.. code-block:: python

    # blocking_recv.py
    import multiprocessing
    import zmq


    addr = 'tcp://127.0.0.1:5678'


    def ping():
        """Sends ping requests and waits for replies."""
        context = zmq.Context()
        sock = context.socket(zmq.REQ)
        sock.bind(addr)

        for i in range(5):
            sock.send_unicode('ping %s' % i)
            rep = sock.recv_unicode()  # This blocks until we get something
            print('Ping got reply:', rep)


    def pong():
        """Waits for ping requests and replies with a pong."""
        context = zmq.Context()
        sock = context.socket(zmq.REP)
        sock.connect(addr)

        for i in range(5):
            req = sock.recv_unicode()  # This also blocks
            print('Pong got request:', req)
            sock.send_unicode('pong %s' % i)


    if __name__ == '__main__':
        pong_proc = multiprocessing.Process(target=pong)
        pong_proc.start()

        ping()

        pong_proc.join()


So this is very easy and no that much code. The problem with this is, that it
only works well if your process only uses one socket. Unfortunately, in larger
applications that is rather rarely the case.

A way to handle multiple sockets per process is polling. In addition to your
context and socket(s), you need a *poller*. You also have to tell it which
events on which socket you are going to poll:

.. code-block:: python

    # polling.py
    def pong():
        """Waits for ping requests and replies with a pong."""
        context = zmq.Context()
        sock = context.socket(zmq.REP)
        sock.bind(addr)

        # Create a poller and register the events we want to poll
        poller = zmq.Poller()
        poller.register(sock, zmq.POLLIN|zmq.POLLOUT)

        for i in range(10):
            # Get all sockets that can do something
            socks = dict(poller.poll())

            # Check if we can receive something
            if sock in socks and socks[sock] == zmq.POLLIN:
                req = sock.recv_unicode()
                print('Pong got request:', req)

            # Check if we cann send something
            if sock in socks and socks[sock] == zmq.POLLOUT:
                sock.send_unicode('pong %s' % (i // 2))

        poller.unregister(sock)


You see, that our *pong* function got pretty ugly. You need 10 iterations to do
five ping-pongs, because in each iteration you can either send or reply. And
each socket you add to your process adds two more if-statements. You could
improve that design if you created a base class wrapping the polling loop and
just register sockets and callbacks in an inheriting class.

That brings us to our final example. PyZMQ comes with with an `adapted Tornado
eventloop <http://zeromq.github.com/pyzmq/eventloop.html>`_ that handles the
polling and works with `ZMQStreams
<http://zeromq.github.com/pyzmq/api/generated/zmq.eventloop.zmqstream.html#zmq.eventloop.zmqstream.ZMQStream>`_,
that wrap sockets and add some functionality:

.. code-block:: python

    # eventloop.py
    from zmq.eventloop import ioloop, zmqstream


    class Pong(multiprocessing.Process):
        """Waits for ping requests and replies with a pong."""
        def __init__(self):
            super().__init__()
            self.loop = None
            self.stream = None
            self.i = 0

        def run(self):
            """
            Initializes the event loop, creates the sockets/streams and
            starts the (blocking) loop.

            """
            context = zmq.Context()
            self.loop = ioloop.IOLoop.instance()  # This is the event loop

            sock = context.socket(zmq.REP)
            sock.bind(addr)
            # We need to create a stream from our socket and
            # register a callback for recv events.
            self.stream = zmqstream.ZMQStream(sock, self.loop)
            self.stream.on_recv(self.handle_ping)

            # Start the loop. It runs until we stop it.
            self.loop.start()

        def handle_ping(self, msg):
            """Handles ping requests and sends back a pong."""
            # req is a list of byte objects
            req = msg[0].decode()
            print('Pong got request:', req)
            self.stream.send_unicode('pong %s' % self.i)

            # We’ll stop the loop after 5 pings
            self.i += 1
            if self.i == 5:
                self.stream.flush()
                self.loop.stop()


This even adds more boilerplate code, but it will pay of if you use more
sockets and most of that stuff in *run()* can be put into a base class. Another
drawback is, that the IOLoop only uses `recv_multipart()
<http://zeromq.github.com/pyzmq/api/generated/zmq.core.socket.html#zmq.core.socket.Socket.recv_multipart>`_.
So you always get a lists of byte strings which you have to decode or
deserialize on your own. However, you can use all the *send* methods socket
offers (like *send_unicode()* or *send_json()*). You can also stop the loop
from within a message handler.

In the next sections, I’ll discuss how you could implement a PyZMQ process that
uses the event loop.


Communication Design
--------------------

Before you start to implement anything, you should think about what kind of
processes you need in your application and which messages they exchange. You
should also decide what kind of message format and serialization you want to
use.

PyZMQ has built-in support for Unicode (*send* sends plain C strings which map
to Python *byte* objects, so there’s a separate method to send Unicode
strings), JSON and Pickle.

JSON is nice, because it’s fast and lets you integrate processes written in
other languages into you application. It’s also a bit safer, because you cannot
receive arbitrary objects as with `pickle
<http://docs.python.org/py3k/library/pickle#restricting-globals>`_.  The most
straightforward syntax for JSON messages is to let them be triples *[msg_type,
args, kwargs]*, where *msg_type* maps to a method name and *args* and *kwargs*
get passed as positional and keyword arguments.

I strongly recommend you to document each chain of messages your application
sends to perform a certain task. I do this with fancy PowerPoint graphics and
with even fancier ASCII art in `Sphinx <http://sphinx.pocoo.org>`_. Here is how
I would document our ping-pong:

.. code-block:: rst

    Sending pings
    -------------

    * If the ping process sends a *ping*, the pong processes responds with a
      *pong*.
    * The number of pings (and pongs) is counted. The current ping count is
      sent with each message.

    ::

        PingProc      PongProc
         [REQ] ---1--> [REP]
               <--2---


        1 IN : ['ping, count']
        1 OUT: ['ping, count']

        2 IN : ['pong, count']
        2 OUT: ['pong, count']

First, I write some bullet points that explain how the processes behave and why
they behave this way. This is followed by some kind of sequence diagram that
shows when which process sents which message using which socket type. Finally,
I write down how the messages are looking. ``# IN`` is what you would pass to
*send_multipart* and ``# OUT`` is, what is received on the other side by
*recv_multipart*. If one of the participating sockets is a *ROUTER* or
*DEALER*, *IN* and *OUT* will differ (though that’s not the case in this
example). Everything in single quotation marks (``'``) represents a JSON
serialized list.

If our pong process used a *ROUTER* socket instead of the *REP* socket, it
would look like this::

    1 IN : ['ping, count']
    1 OUT: [ping_uuid, '', 'ping, count']

    2 IN : [ping_uuid, '', 'pong, count']
    2 OUT: ['pong, count']


This seems like a lot of tedious work, but trust me, it really helps a lot
when you need to change something a few weeks later!


Application Design
------------------

In the examples above, the *Pong* process was responsible for setting
everything up, for receiving/sending messages and for the actual application
logic (counting incoming pings and creating a pong).

Obviously, this is not a very good design. What we can do about this is to put
most of that nasty setup stuff into a base class which all your processes can
inherit from, and to put all the actual application logic into a separate
(PyZMQ independent) class.

ZmqPocess – The Base Class for all Processes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The base class basically implements two things:

- a *setup* method that creates a context an a loop
- a *stream* factory method for streams with a *on_recv* callback. It creates
  a socket and can connect/bind it to a given address or bind it to a random
  port (that’s why it returns the port number in addition to the stream
  itself).

It also inherits `multiprocessing.Process
<http://docs.python.org/py3k/library/multiprocessing#process-and-exceptions>`_
so that it is easier to spawn it as sub-process. Of course, you can also just
call its *run()* method from you *main()*.

.. code-block:: python

    # zmqproc.py
    import multiprocessing

    from zmq.eventloop import ioloop, zmqstream
    import zmq


    class ZmqProcess(multiprocessing.Process):
        """
        This is the base for all processes and offers utility functions
        for setup and creating new streams.

        """
        def __init__(self):
            super().__init__()

            self.context = None
            """The ØMQ :class:`~zmq.Context` instance."""

            self.loop = None
            """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""

        def setup(self):
            """
            Creates a :attr:`context` and an event :attr:`loop` for the process.

            """
            self.context = zmq.Context()
            self.loop = ioloop.IOLoop.instance()

        def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
            """
            Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.

            :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
            :param addr: Address to bind or connect to formatted as *host:port*,
                    *(host, port)* or *host* (bind to random port).
                    If *bind* is ``True``, *host* may be:

                    - the wild-card ``*``, meaning all available interfaces,
                    - the primary IPv4 address assigned to the interface, in its
                      numeric representation or
                    - the interface name as defined by the operating system.

                    If *bind* is ``False``, *host* may be:

                    - the DNS name of the peer or
                    - the IPv4 address of the peer, in its numeric representation.

                    If *addr* is just a host name without a port and *bind* is
                    ``True``, the socket will be bound to a random port.
            :param bind: Binds to *addr* if ``True`` or tries to connect to it
                    otherwise.
            :param callback: A callback for
                    :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
            :param subscribe: Subscription pattern for *SUB* sockets, optional,
                    defaults to ``b''``.
            :returns: A tuple containg the stream and the port number.

            """
            sock = self.context.socket(sock_type)

            # addr may be 'host:port' or ('host', port)
            if isinstance(addr, str):
                addr = addr.split(':')
            host, port = addr if len(addr) == 2 else (addr[0], None)

            # Bind/connect the socket
            if bind:
                if port:
                    sock.bind('tcp://%s:%s' % (host, port))
                else:
                    port = sock.bind_to_random_port('tcp://%s' % host)
            else:
                sock.connect('tcp://%s:%s' % (host, port))

            # Add a default subscription for SUB sockets
            if sock_type == zmq.SUB:
                sock.setsockopt(zmq.SUBSCRIBE, subscribe)

            # Create the stream and add the callback
            stream = zmqstream.ZMQStream(sock, self.loop)
            if callback:
                stream.on_recv(callback)

            return stream, int(port)


PongProc – The Actual Process
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The *PongProc* inherits *ZmqProcess* and is the main class for our process. It
creates the streams, starts the event loop and dispatches all messages to the
appropriate handlers:

.. code-block:: python

    # pongproc.py
    from zmq.utils import jsonapi as json
    import zmq

    import zmqproc


    host = '127.0.0.1'
    port = 5678


    class PongProc(zmqproc.ZmqProcess):
        """
        Main processes for the Ponger. It handles ping requests and sends back
        a pong.

        """
        def __init__(self, bind_addr):
            super().__init__()

            self.bind_addr = bind_addr
            self.rep_stream = None

            # Make sure this is pickle-able (e.g., not using threads)
            # or it won't work on Windows. If it's not pickle-able, instantiate
            # it in setup().
            self.ping_handler = PingHandler()

        def setup(self):
            """Sets up PyZMQ and creates all streams."""
            super().setup()

            self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True,
                    callback=self.handle_rep_stream)

        def run(self):
            """Sets up everything and starts the event loop."""
            self.setup()
            self.loop.start()

        def stop(self):
            """Stops the event loop."""
            self.loop.stop()

        def handle_rep_stream(self, msg):
            """
            Handles messages from a Pinger:

            *ping*
                Send back a pong.

            *plzdiekthxbye*
                Stop the ioloop and exit.

            """
            msg_type, data = json.loads(msg[0])

            if msg_type == 'ping':
                rep = self.ping_handler.make_pong(data)
                self.rep_stream.send_json(rep)

            elif msg_type == 'plzdiekthxbye':
                self.stop()

            else:
                raise RuntimeError('Received unkown message type: %s' % msg_type)

There are a couple of things to note here:

- I instantiated the *PingHandler* in the process’ *__init__* method. If you
  are going to start this process as a sub-process via *start*, make sure
  everything you instantiate in *__init__* is pickle-able or it won’t work on
  Windows (Linux and Mac OS X use *fork* to create a sub-process and *fork*
  just makes a copy of the main process and gives it a new process ID. `On
  Windows <http://docs.python.org/py3k/library/multiprocessing#windows>`_,
  there is no *fork* and the context of your main process is pickled and sent
  to the sub-process).

- In *setup*, call ``super().setup()`` before you create a stream or you
  won’t have a loop instance for them. You don’t call *setup* in
  the process’ *__init__*, because the context must be created within the
  new system process. So we call *setup* in *run*.

- The *stop* method is not really necessary in this example, but it can be used
  to send stop messages to sub-processes when the main process terminates and
  to do other kinds of clean-up. You can also execute it if you except a
  ``KeyboardInterrupt`` after calling *run*.

- *handle_rep_stream* is the message dispatcher for the process’ *REP* stream.
  It parses the message and calls the appropriate handler for that message (or
  raises an error if the message type is invalid). If your *if* and *elif*
  statements all do the same, you might consider replacing them with a dict
  that contains the handlers for each message type:

  .. code-block:: python

    handlers = {
        'msg': self.handler_for_msg,
    }
    try:
        rep = handlers[msg_type](data)
        self.rep_stream.send_multipart(msg)
    except KeyError:
        raise RuntimeError('Received unknown message.')

PingHandler – The Application Logic
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The *PingHandler* contains the actual application logic (which is not much,
in this example). The *make_pong* method just gets the number of pings sent
with the *ping* message and creates a new *pong* message. The serialization
is done by *PongProc*, so our Handler does not depend on PyZMQ:

.. code-block:: python

    class PingHandler(object):

        def make_pong(self, num_pings):
            """Creates and returns a pong message."""
            print('Pong got request number %s' % num_pings)

            return ['pong', num_pings]


Summary
-------

Okay, that’s it for now. I showed you three ways to use PyZMQ. If you have a
very simple process with only one socket, you can easily use its blocking
*recv* methods. If you need more than one socket, I recommend using the event
loop. And polling … you don’t want to use that.

If you decide to use PyZMQ’s event loop, you should separate the application
logic from all the PyZMQ stuff (like creating streams, sending/receiving
messages and dispatching them). If your application consists of more then one
process (which is usually the case), you should also create a base class with
shared functionality for them.

In the next part, I’m going to talk about how you can test your application.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.