everything up, for receiving/sending messages and for the actual application
logic (counting incoming pings and creating a pong).
-Obviously, this is not a very good design. What we can do about this is to put
-most of that nasty setup stuff into a base class which all your processes can
-inherit from, and to put all the actual application logic into a separate
-(PyZMQ independent) class.
+Obviously, this is not a very good design (at least if your application gets
+more complex than our little ping-pong example). What we can do about this is
+to put most of that nasty setup stuff into a base class which all your
+processes can inherit from, separate message handling and (de)serialization
+from it and finally put all the actual application logic into a separate
+(PyZMQ-independent) class. This will result in a three-level architecture:
+#. The lowest tier will contain the entry point of the process, set-up
+ everything and start the event loop. A common base class provides utilities
+ for creating sockets/streams and setting everything up.
+#. The second level is message handling and (de) serialization. A base class
+ performs the (de)serialization and error handling. A message handler
+ inherits this class and implements a method for each message type that
+#. The third level will be the application logic and completely PyZMQ-agnostic.
+Base classes should be defined for the first two tiers two reduce redundant
+code in multiple processes or message handlers. The following figure shows the
+five classes our process is going to consist of:
+.. figure:: pongproc_architecture.png
+ :alt: The architecture of our pong process.
+ The refactored PongProc now consists of three layers. The main class
+ *PongProc* inherits *ZMQProcess*. Every stream gets a *MessageHandler*. In
+ our example it’s just *RepStreamHandler*. Finally, you can have one ore more
+ classes containing the (PyZMQ-agnostic) application logic. In our example,
+ it’s called *PingHandler*, because it handles incoming pings.
ZmqPocess – The Base Class for all Processes
from zmq.eventloop import ioloop, zmqstream
- the wild-card ``*``, meaning all available interfaces,
- the primary IPv4 address assigned to the interface, in its
numeric representation or
+ numeric representation or
- the interface name as defined by the operating system.
If *bind* is ``False``, *host* may be:
- from zmq.utils import jsonapi as json
+ # example_app/pongproc.py
- class PongProc(
+ class PongProc(.ZmqProcess):
Main processes for the Ponger. It handles ping requests and sends back
self.bind_addr = bind_addr
- # Make sure this is pickle-able (e.g., not using threads)
- # or it won't work on Windows. If it's not pickle-able, instantiate
self.ping_handler = PingHandler()
"""Sets up PyZMQ and creates all streams."""
- self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True,
+ # Create the stream and add the message handler
+ self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True)
+ self.rep_stream.on_recv(RepStreamHandler(self.rep_stream, self.stop,
"""Sets up everything and starts the event loop."""
"""Stops the event loop."""
- def handle_rep_stream(self, msg):
+If you are going to start this process as a sub-process via *start*, make sure
+everything you instantiate in *__init__* is pickle-able or it won’t work on
+Windows (Linux and Mac OS X use *fork* to create a sub-process and *fork* just
+makes a copy of the main process and gives it a new process ID. `On Windows
+<http://docs.python.org/py3k/library/multiprocessing#windows>`_, there is no
+*fork* and the context of your main process is pickled and sent to the
+In *setup*, call ``super().setup()`` before you create a stream or you won’t
+have a *loop* instance for them. We call *setup* from *run*, because the
+context must be created within the new system process, which wouldn’t be the
+case if we called *setup* from *__init__*.
+The *stop* method is not really necessary in this example, but it can be used
+to send stop messages to sub-processes when the main process terminates and to
+do other kinds of clean-up. You can also execute it if you except
+a ``KeyboardInterrupt`` after calling *run*.
+MessageHandler — The Base Class for Message Handlers
+A PyZMQ message handler can be any callable that accepts one argument—the list
+of message parts as byte objects. Hence, our *MessageHandler* class needs to
+ from zmq.utils import jsonapi as json
+ class MessageHandler(object):
+ Base class for message handlers for a :class:`ZMQProcess`.
+ Inheriting classes only need to implement a handler function for each
+ def __init__(self, json_load=-1):
+ self._json_load = json_load
+ def __call__(self, msg):
- Handles messages from a Pinger:
- Stop the ioloop and exit.
+ Gets called when a messages is received by the stream this handlers is
+ registered at. *msg* is a list as return by
- msg_type, data = json.loads(msg)
+ # Try to JSON-decode the index "self._json_load" of the message
+ msg_type, data = json.loads(msg[i])
- rep = self.ping_handler.make_pong(data)
+ # Get the actual message handler and call it
+ if msg_type.startswith('_'):
+ raise AttributeError('%s starts with an "_"' % msg_type)
- elif msg_type == 'plzdiekthxbye':
+ getattr(self, msg_type)(*msg)
- raise RuntimeError('Received unkown message type: %s' % msg_type)
+As you can see, it’s quite simle. It just tries to JSON-load the index defined
+by ``self._json_load``. We earlier defined, that the first element of the
+JSON-encoded message defines the message type (e.g., *ping*). If an attribute
+of the same name exists in the inheriting class, it is called with the remainer
-There are a couple of things to note here:
+You can also add logging or additional security measures here, but that is not
-- I instantiated the *PingHandler* in the process’ *__init__* method. If you
- are going to start this process as a sub-process via *start*, make sure
- everything you instantiate in *__init__* is pickle-able or it won’t work on
- Windows (Linux and Mac OS X use *fork* to create a sub-process and *fork*
- just makes a copy of the main process and gives it a new process ID. `On
- Windows <http://docs.python.org/py3k/library/multiprocessing#windows>`_,
- there is no *fork* and the context of your main process is pickled and sent
-- In *setup*, call ``super().setup()`` before you create a stream or you
- won’t have a loop instance for them. You don’t call *setup* in
- the process’ *__init__*, because the context must be created within the
- new system process. So we call *setup* in *run*.
+RepStreamHandler — The Concrete Message Handler
-- The *stop* method is not really necessary in this example, but it can be used
- to send stop messages to sub-processes when the main process terminates and
- to do other kinds of clean-up. You can also execute it if you except a
- ``KeyboardInterrupt`` after calling *run*.
+This class inherits the *MessageHandler* I just showed you and is used in
+*PongProc.setup*. It defines a handler method for *ping* messages and the
+*plzdiekthxbye* stop message. In its *__init__* it receives references to the
+*rep_stream*, PongProcs *stop* method and to the *ping_handler*, our actual
-- *handle_rep_stream* is the message dispatcher for the process’ *REP* stream.
- It parses the message and calls the appropriate handler for that message (or
- raises an error if the message type is invalid). If your *if* and *elif*
- statements all do the same, you might consider replacing them with a dict
- that contains the handlers for each message type:
+ # example_app/pongproc.py
+ class RepStreamHandler(base.MessageHandler):
+ """Handels messages arrvinge at the PongProc’s REP stream."""
+ def __init__(self, rep_stream, stop, ping_handler):
+ self._rep_stream = rep_stream
+ self._ping_handler = ping_handler
- 'msg': self.handler_for_msg,
- rep = handlers[msg_type](data)
- raise RuntimeError('Received unknown message.')
+ """Send back a pong."""
+ rep = self._ping_handler.make_pong(data)
+ def plzdiekthxbye(self, data):
+ """Just calls :meth:`PongProc.stop`."""
PingHandler – The Application Logic