Commits

Stefan Scherfke  committed 7267c55

Files for part 3.

  • Participants
  • Parent commits ead3fb4

Comments (0)

Files changed (14)

File article_2.html

 testing PyZMQ apps is really not that hard and not much different from normal
 unit testing. However, what we know now is only, that our process should work
 <em>in theory</em>. We haven’t yet started it and sent real messages to it.</p>
-<p>The next and final part of this series will show you how you can automate
-testing complete processes. Until then, you should get your <a class="reference external" href="http://pypi.python.org/pypi/pytest-cov">test coverage</a> up to 100% to protect yourself from
-nasty surprises when you start with process testing.</p>
+<p>The <a class="reference external" href="http://stefan.sofa-rockers.org/2012/02/15/designing-and-testing-pyzmq-applications-part-3/">next and final part</a>
+of this series will show you how you
+can automate testing complete processes. Until then, you should get your <a class="reference external" href="http://pypi.python.org/pypi/pytest-cov">test
+coverage</a> up to 100% to protect
+yourself from nasty surprises when you start with process testing.</p>
 </div>
 </div>
 </body>

File article_2.txt

 unit testing. However, what we know now is only, that our process should work
 *in theory*. We haven’t yet started it and sent real messages to it.
 
-The next and final part of this series will show you how you can automate
-testing complete processes. Until then, you should get your `test coverage
-<http://pypi.python.org/pypi /pytest-cov>`_ up to 100% to protect yourself from
-nasty surprises when you start with process testing.
+The `next and final part
+<http://stefan.sofa-rockers.org/2012/02/15/designing-and-testing-pyzmq-applications-part-3/>`_
+of this series will show you how you
+can automate testing complete processes. Until then, you should get your `test
+coverage <http://pypi.python.org/pypi /pytest-cov>`_ up to 100% to protect
+yourself from nasty surprises when you start with process testing.

File article_3.html

+<?xml version="1.0" encoding="utf-8" ?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+<meta name="generator" content="Docutils 0.8.1: http://docutils.sourceforge.net/" />
+<title>Designing and Testing PyZMQ Applications – Part 3</title>
+<style type="text/css">
+
+/*
+:Author: David Goodger (goodger@python.org)
+:Id: $Id: html4css1.css 7056 2011-06-17 10:50:48Z milde $
+:Copyright: This stylesheet has been placed in the public domain.
+
+Default cascading style sheet for the HTML output of Docutils.
+
+See http://docutils.sf.net/docs/howto/html-stylesheets.html for how to
+customize this style sheet.
+*/
+
+/* used to remove borders from tables and images */
+.borderless, table.borderless td, table.borderless th {
+  border: 0 }
+
+table.borderless td, table.borderless th {
+  /* Override padding for "table.docutils td" with "! important".
+     The right padding separates the table cells. */
+  padding: 0 0.5em 0 0 ! important }
+
+.first {
+  /* Override more specific margin styles with "! important". */
+  margin-top: 0 ! important }
+
+.last, .with-subtitle {
+  margin-bottom: 0 ! important }
+
+.hidden {
+  display: none }
+
+a.toc-backref {
+  text-decoration: none ;
+  color: black }
+
+blockquote.epigraph {
+  margin: 2em 5em ; }
+
+dl.docutils dd {
+  margin-bottom: 0.5em }
+
+object[type="image/svg+xml"], object[type="application/x-shockwave-flash"] {
+  overflow: hidden;
+}
+
+/* Uncomment (and remove this text!) to get bold-faced definition list terms
+dl.docutils dt {
+  font-weight: bold }
+*/
+
+div.abstract {
+  margin: 2em 5em }
+
+div.abstract p.topic-title {
+  font-weight: bold ;
+  text-align: center }
+
+div.admonition, div.attention, div.caution, div.danger, div.error,
+div.hint, div.important, div.note, div.tip, div.warning {
+  margin: 2em ;
+  border: medium outset ;
+  padding: 1em }
+
+div.admonition p.admonition-title, div.hint p.admonition-title,
+div.important p.admonition-title, div.note p.admonition-title,
+div.tip p.admonition-title {
+  font-weight: bold ;
+  font-family: sans-serif }
+
+div.attention p.admonition-title, div.caution p.admonition-title,
+div.danger p.admonition-title, div.error p.admonition-title,
+div.warning p.admonition-title {
+  color: red ;
+  font-weight: bold ;
+  font-family: sans-serif }
+
+/* Uncomment (and remove this text!) to get reduced vertical space in
+   compound paragraphs.
+div.compound .compound-first, div.compound .compound-middle {
+  margin-bottom: 0.5em }
+
+div.compound .compound-last, div.compound .compound-middle {
+  margin-top: 0.5em }
+*/
+
+div.dedication {
+  margin: 2em 5em ;
+  text-align: center ;
+  font-style: italic }
+
+div.dedication p.topic-title {
+  font-weight: bold ;
+  font-style: normal }
+
+div.figure {
+  margin-left: 2em ;
+  margin-right: 2em }
+
+div.footer, div.header {
+  clear: both;
+  font-size: smaller }
+
+div.line-block {
+  display: block ;
+  margin-top: 1em ;
+  margin-bottom: 1em }
+
+div.line-block div.line-block {
+  margin-top: 0 ;
+  margin-bottom: 0 ;
+  margin-left: 1.5em }
+
+div.sidebar {
+  margin: 0 0 0.5em 1em ;
+  border: medium outset ;
+  padding: 1em ;
+  background-color: #ffffee ;
+  width: 40% ;
+  float: right ;
+  clear: right }
+
+div.sidebar p.rubric {
+  font-family: sans-serif ;
+  font-size: medium }
+
+div.system-messages {
+  margin: 5em }
+
+div.system-messages h1 {
+  color: red }
+
+div.system-message {
+  border: medium outset ;
+  padding: 1em }
+
+div.system-message p.system-message-title {
+  color: red ;
+  font-weight: bold }
+
+div.topic {
+  margin: 2em }
+
+h1.section-subtitle, h2.section-subtitle, h3.section-subtitle,
+h4.section-subtitle, h5.section-subtitle, h6.section-subtitle {
+  margin-top: 0.4em }
+
+h1.title {
+  text-align: center }
+
+h2.subtitle {
+  text-align: center }
+
+hr.docutils {
+  width: 75% }
+
+img.align-left, .figure.align-left, object.align-left {
+  clear: left ;
+  float: left ;
+  margin-right: 1em }
+
+img.align-right, .figure.align-right, object.align-right {
+  clear: right ;
+  float: right ;
+  margin-left: 1em }
+
+img.align-center, .figure.align-center, object.align-center {
+  display: block;
+  margin-left: auto;
+  margin-right: auto;
+}
+
+.align-left {
+  text-align: left }
+
+.align-center {
+  clear: both ;
+  text-align: center }
+
+.align-right {
+  text-align: right }
+
+/* reset inner alignment in figures */
+div.align-right {
+  text-align: inherit }
+
+/* div.align-center * { */
+/*   text-align: left } */
+
+ol.simple, ul.simple {
+  margin-bottom: 1em }
+
+ol.arabic {
+  list-style: decimal }
+
+ol.loweralpha {
+  list-style: lower-alpha }
+
+ol.upperalpha {
+  list-style: upper-alpha }
+
+ol.lowerroman {
+  list-style: lower-roman }
+
+ol.upperroman {
+  list-style: upper-roman }
+
+p.attribution {
+  text-align: right ;
+  margin-left: 50% }
+
+p.caption {
+  font-style: italic }
+
+p.credits {
+  font-style: italic ;
+  font-size: smaller }
+
+p.label {
+  white-space: nowrap }
+
+p.rubric {
+  font-weight: bold ;
+  font-size: larger ;
+  color: maroon ;
+  text-align: center }
+
+p.sidebar-title {
+  font-family: sans-serif ;
+  font-weight: bold ;
+  font-size: larger }
+
+p.sidebar-subtitle {
+  font-family: sans-serif ;
+  font-weight: bold }
+
+p.topic-title {
+  font-weight: bold }
+
+pre.address {
+  margin-bottom: 0 ;
+  margin-top: 0 ;
+  font: inherit }
+
+pre.literal-block, pre.doctest-block, pre.math {
+  margin-left: 2em ;
+  margin-right: 2em }
+
+span.classifier {
+  font-family: sans-serif ;
+  font-style: oblique }
+
+span.classifier-delimiter {
+  font-family: sans-serif ;
+  font-weight: bold }
+
+span.interpreted {
+  font-family: sans-serif }
+
+span.option {
+  white-space: nowrap }
+
+span.pre {
+  white-space: pre }
+
+span.problematic {
+  color: red }
+
+span.section-subtitle {
+  /* font-size relative to parent (h1..h6 element) */
+  font-size: 80% }
+
+table.citation {
+  border-left: solid 1px gray;
+  margin-left: 1px }
+
+table.docinfo {
+  margin: 2em 4em }
+
+table.docutils {
+  margin-top: 0.5em ;
+  margin-bottom: 0.5em }
+
+table.footnote {
+  border-left: solid 1px black;
+  margin-left: 1px }
+
+table.docutils td, table.docutils th,
+table.docinfo td, table.docinfo th {
+  padding-left: 0.5em ;
+  padding-right: 0.5em ;
+  vertical-align: top }
+
+table.docutils th.field-name, table.docinfo th.docinfo-name {
+  font-weight: bold ;
+  text-align: left ;
+  white-space: nowrap ;
+  padding-left: 0 }
+
+h1 tt.docutils, h2 tt.docutils, h3 tt.docutils,
+h4 tt.docutils, h5 tt.docutils, h6 tt.docutils {
+  font-size: 100% }
+
+ul.auto-toc {
+  list-style-type: none }
+
+</style>
+</head>
+<body>
+<div class="document" id="designing-and-testing-pyzmq-applications-part-3">
+<h1 class="title">Designing and Testing PyZMQ Applications – Part 3</h1>
+
+<p>The third and last part of this <a class="reference external" href="http://stefan.sofa-rockers.org/2012/02/01/designing-and-testing-pyzmq-applications-part-1/">series</a> is
+again just about testing. While the <a class="reference external" href="http://stefan.sofa-rockers.org/2012/02/07/designing-and-testing-pyzmq-applications-part-2/">previous article</a>
+focused on unit testing, this one will be about testing complete PyZMQ
+processes. This even involves some <a class="reference external" href="http://bit.ly/g3wTBC">magic</a>!</p>
+<p>Once you’ve made sure that your message dispatching and application logic works
+fine, you can actually start sending real messages to your process and checking
+real replies. This can be done for single processes—I call this <em>process
+testing</em>—and for your complete application (<em>system testing</em>).</p>
+<p>When you test a single process, you create sockets that mimic all processes the
+tested process communicates with. When you do a system test, you only mimic a
+client or just invoke your program from the command line and check its output
+(e.g., what it prints to <em>stdtout</em> and <em>stderr</em> or results written to a
+database).</p>
+<p>I’ll start with process testing, which is a bit more generalizable than system
+testing.</p>
+<div class="section" id="process-testing">
+<h1>Process Testing</h1>
+<p>The biggest problem I ran into when I started testing processes was that I
+often made blocking calls to <em>recv</em> methods and these halted my tests and gave
+me no output about what actually went wrong. Though you can make them non-
+blocking by passing <tt class="docutils literal">zmq.NOBLOCK</tt> as an extra argument, this doesn’t solve
+your problems. You will now need a very precise timing and many
+<tt class="docutils literal">time.sleep(x)</tt> calls, because <em>recv</em> will instantly raise an error if there
+is nothing to be received.</p>
+<p>My solution for this was to wrap PyZMQ sockets and add a timeout to its <em>send</em>
+and <em>recv</em> methods. The following wrapper will try to receive something for
+one second and raise an exception if that failed. There’s also <a class="reference external" href="https://bitbucket.org/ssc/pyzmq-article/src/tip/test/support.py#cl-27">a simple
+wrapper</a> for methods like <em>connect</em> or <em>bind</em>,
+but it’s really not that interesting, so I’ll omit it here.</p>
+<div class="highlight"><pre><span class="c"># test/support.py</span>
+
+<span class="k">def</span> <span class="nf">get_wrapped_fwd</span><span class="p">(</span><span class="n">func</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Returns a wrapper, that tries to call *func* multiple time in non-blocking</span>
+<span class="sd">    mode before rasing an :class:`zmq.ZMQError`.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">def</span> <span class="nf">forwarder</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
+        <span class="c"># 100 tries * 0.01 second == 1 second</span>
+        <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">100</span><span class="p">):</span>
+            <span class="k">try</span><span class="p">:</span>
+                <span class="n">rep</span> <span class="o">=</span> <span class="n">func</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="n">flags</span><span class="o">=</span><span class="n">zmq</span><span class="o">.</span><span class="n">NOBLOCK</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+                <span class="k">return</span> <span class="n">rep</span>
+
+            <span class="k">except</span> <span class="n">zmq</span><span class="o">.</span><span class="n">ZMQError</span><span class="p">:</span>
+                <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mf">0.01</span><span class="p">)</span>
+
+        <span class="c"># We should not get here, so raise an error.</span>
+        <span class="n">msg</span> <span class="o">=</span> <span class="s">&#39;Could not </span><span class="si">%s</span><span class="s"> message.&#39;</span> <span class="o">%</span> <span class="n">func</span><span class="o">.</span><span class="n">__name__</span><span class="p">[:</span><span class="mi">4</span><span class="p">]</span>
+        <span class="k">raise</span> <span class="n">zmq</span><span class="o">.</span><span class="n">ZMQError</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
+
+    <span class="k">return</span> <span class="n">forwarder</span>
+</pre></div>
+<p>This wrapper is now used to create a <em>TestSocket</em> class with the desired
+behavior:</p>
+<div class="highlight"><pre><span class="c"># test/support.py</span>
+
+<span class="k">class</span> <span class="nc">TestSocket</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods</span>
+<span class="sd">    will be called multiple times in non-blocking mode before a</span>
+<span class="sd">    :class:`zmq.ZMQError` is raised.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">sock_type</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_context</span> <span class="o">=</span> <span class="n">context</span>
+
+        <span class="n">sock</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">socket</span><span class="p">(</span><span class="n">sock_type</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_sock</span> <span class="o">=</span> <span class="n">sock</span>
+
+        <span class="n">forwards</span> <span class="o">=</span> <span class="p">[</span>  <span class="c"># These methods can simply be forwarded</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">bind</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">bind_to_random_port</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">connect</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">close</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">setsockopt</span><span class="p">,</span>
+        <span class="p">]</span>
+        <span class="n">wrapped_fwd</span> <span class="o">=</span> <span class="p">[</span>  <span class="c"># These methods are wrapped with a for loop</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">recv</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">recv_json</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">recv_multipart</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">recv_unicode</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">send</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">send_json</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">send_multipart</span><span class="p">,</span>
+            <span class="n">sock</span><span class="o">.</span><span class="n">send_unicode</span><span class="p">,</span>
+        <span class="p">]</span>
+
+        <span class="k">for</span> <span class="n">func</span> <span class="ow">in</span> <span class="n">forwards</span><span class="p">:</span>
+            <span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="o">.</span><span class="n">__name__</span><span class="p">,</span> <span class="n">get_forwarder</span><span class="p">(</span><span class="n">func</span><span class="p">))</span>
+
+        <span class="k">for</span> <span class="n">func</span> <span class="ow">in</span> <span class="n">wrapped_fwd</span><span class="p">:</span>
+            <span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="o">.</span><span class="n">__name__</span><span class="p">,</span> <span class="n">get_wrapped_fwd</span><span class="p">(</span><span class="n">func</span><span class="p">))</span>
+</pre></div>
+<p>In order to reuse the same ports for all test methods, you need to cleanly
+close all sockets after each test. To handle method level setup/teardown in
+pytest, you need to implement a <em>setup_method</em> and a <em>teardown_method</em>. In the
+setup method, you create one or more <em>TestSocket</em> instances that mimic other
+processes and you also start the process to be tested:</p>
+<div class="highlight"><pre><span class="c"># test/process/test_pongproc.py</span>
+<span class="kn">import</span> <span class="nn">pytest</span>
+<span class="kn">import</span> <span class="nn">zmq</span>
+
+<span class="kn">from</span> <span class="nn">test.support</span> <span class="kn">import</span> <span class="n">ProcessTest</span><span class="p">,</span> <span class="n">make_sock</span>
+<span class="kn">import</span> <span class="nn">pongproc</span>
+
+
+<span class="n">host</span> <span class="o">=</span> <span class="s">&#39;127.0.0.1&#39;</span>
+<span class="n">port</span> <span class="o">=</span> <span class="mi">5678</span>
+
+
+<span class="k">class</span> <span class="nc">TestProngProc</span><span class="p">(</span><span class="n">ProcessTest</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;Communication test for the Platform Manager process.&quot;&quot;&quot;</span>
+
+    <span class="k">def</span> <span class="nf">setup_method</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Creates and starts a PongProc process and sets up sockets to</span>
+<span class="sd">        communicate with it.</span>
+
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">context</span> <span class="o">=</span> <span class="n">zmq</span><span class="o">.</span><span class="n">Context</span><span class="p">()</span>
+
+        <span class="c"># make_sock creates and connects a TestSocket that we will use to</span>
+        <span class="c"># mimic the Ping process</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span> <span class="o">=</span> <span class="n">make_sock</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">context</span><span class="p">,</span> <span class="n">zmq</span><span class="o">.</span><span class="n">REQ</span><span class="p">,</span>
+                                  <span class="n">connect</span><span class="o">=</span><span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">))</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">pp</span> <span class="o">=</span> <span class="n">pongproc</span><span class="o">.</span><span class="n">PongProc</span><span class="p">((</span><span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">))</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">pp</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
+
+    <span class="k">def</span> <span class="nf">teardown_method</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Sends a kill message to the pp and waits for the process to terminate.</span>
+
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="c"># Send a stop message to the prong process and wait until it joins</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="o">.</span><span class="n">send_multipart</span><span class="p">([</span><span class="n">b</span><span class="s">&#39;[&quot;plzdiekthxbye&quot;, null]&#39;</span><span class="p">])</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">pp</span><span class="o">.</span><span class="n">join</span><span class="p">()</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
+</pre></div>
+<p>You may have noticed that our test class inherits <em>ProcessTests</em>. This class
+and some helpers in a <a class="reference external" href="http://pytest.org/latest/plugins.html">conftest.py</a>
+allow us to use some magic that improves the readability of the actual test:</p>
+<div class="highlight"><pre><span class="c"># test/process/test_pongproc.py</span>
+
+    <span class="k">def</span> <span class="nf">test_ping</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;Tests a ping-pong sequence.&quot;&quot;&quot;</span>
+        <span class="k">yield</span> <span class="p">(</span><span class="s">&#39;send&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="p">,</span> <span class="p">[],</span> <span class="p">[</span><span class="s">&#39;ping&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">])</span>
+
+        <span class="n">reply</span> <span class="o">=</span> <span class="k">yield</span> <span class="p">(</span><span class="s">&#39;recv&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="p">)</span>
+        <span class="k">assert</span> <span class="n">reply</span> <span class="o">==</span> <span class="p">[[</span><span class="s">&#39;pong&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">]]</span>
+</pre></div>
+<p>You can just yield <em>send</em> or <em>recv</em> events from your test case! When you yield
+a <em>send</em>, the test machinery tries to send a message via the specified socket.
+When you yield a receive, <em>ProcessTest</em> tries to receive something from the
+socket and sends its result back to your test function, so that you can easily
+compare the reply with the expected result.</p>
+<p>The example above is roughly equivalent to the following code:</p>
+<div class="highlight"><pre><span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="o">.</span><span class="n">send_multipart</span><span class="p">([]</span> <span class="o">+</span> <span class="p">[</span><span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">([</span><span class="s">&#39;ping&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">])])</span>
+
+<span class="n">reply</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">req_sock</span><span class="o">.</span><span class="n">recv_multipart</span><span class="p">()</span>
+<span class="n">reply</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">[</span><span class="n">reply</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]]</span>
+<span class="k">assert</span> <span class="n">reply</span> <span class="o">==</span> <span class="p">[[</span><span class="s">&#39;pong&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">]]</span>
+</pre></div>
+<p>So how does this work? By default, if pytests finds a test function that is a
+generator, it assumes that it generates further test functions. Hence, our
+first step is to override this behavior. We can do this in a <tt class="docutils literal">conftest.py</tt>
+file in the <tt class="docutils literal">test/process/</tt> directory by implementing a
+<em>pytest_pycollect_makeitem</em> function. In this case, we collect generator
+functions like normal functions:</p>
+<div class="highlight"><pre><span class="c"># test/process/conftest.py</span>
+<span class="kn">from</span> <span class="nn">inspect</span> <span class="kn">import</span> <span class="n">isfunction</span><span class="p">,</span> <span class="n">isgeneratorfunction</span>
+
+
+<span class="k">def</span> <span class="nf">pytest_pycollect_makeitem</span><span class="p">(</span><span class="n">collector</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">obj</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Collects all instance methods that are generators and returns them as</span>
+<span class="sd">    normal function items.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">if</span> <span class="n">collector</span><span class="o">.</span><span class="n">funcnamefilter</span><span class="p">(</span><span class="n">name</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="s">&#39;__call__&#39;</span><span class="p">):</span>
+        <span class="k">if</span> <span class="n">isfunction</span><span class="p">(</span><span class="n">obj</span><span class="p">)</span> <span class="ow">or</span> <span class="n">isgeneratorfunction</span><span class="p">(</span><span class="n">obj</span><span class="p">):</span>
+            <span class="k">return</span> <span class="n">collector</span><span class="o">.</span><span class="n">_genfunctions</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">obj</span><span class="p">)</span>
+</pre></div>
+<p>Now, we need to tell pytest how to run a test on the collected generator
+functions. This can be done by implementing <em>pytest_runtest_call</em>. If the
+object we are going to test (<tt class="docutils literal">item.obj</tt>) is a generator function, we call the
+<em>run</em> method  of the object’s instance (<tt class="docutils literal">item.obj.__self__.run</tt>) and pass the
+generator function to it. If the test item contains a normal function, we run
+the default test.</p>
+<div class="highlight"><pre><span class="c"># test/process/conftest.py</span>
+
+<span class="k">def</span> <span class="nf">pytest_runtest_call</span><span class="p">(</span><span class="n">item</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Passes the test generator (``item.obj``) to the ``run()`` method of the</span>
+<span class="sd">    generator&#39;s instance. This method should be inherited from</span>
+<span class="sd">    :class:`test.support.ProcessTest`.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">if</span> <span class="n">isgeneratorfunction</span><span class="p">(</span><span class="n">item</span><span class="o">.</span><span class="n">obj</span><span class="p">):</span>
+        <span class="n">item</span><span class="o">.</span><span class="n">obj</span><span class="o">.</span><span class="n">__self__</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">item</span><span class="o">.</span><span class="n">obj</span><span class="p">)</span>
+    <span class="k">else</span><span class="p">:</span>  <span class="c"># Normal test execution for normal instance methods</span>
+        <span class="n">item</span><span class="o">.</span><span class="n">runtest</span><span class="p">()</span>
+</pre></div>
+<p>But wait—we didn’t implement a <em>run</em> method in our test case! So it must be
+inherited from <em>ProcessTest</em>. Let’s take a look at it:</p>
+<div class="highlight"><pre><span class="c"># test/support.py</span>
+<span class="k">class</span> <span class="nc">ProcessTest</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Base class for process tests. It offers basic actions for sending and</span>
+<span class="sd">    receiving messages and implements the *run* methods that handles the</span>
+<span class="sd">    actual test generators.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+
+    <span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">testfunc</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Iterates over the *testfunc* generator and executes all actions it</span>
+<span class="sd">        yields. Results will be sent back into the generator.</span>
+
+<span class="sd">        :param testfunc: A generator function that yields tuples containing</span>
+<span class="sd">                an action keyword, which should be a function of this or</span>
+<span class="sd">                the inheriting class (like ``send`` or ``recv``) and additional</span>
+<span class="sd">                parameters that will be passed to that function, e.g.:</span>
+<span class="sd">                ``(&#39;send&#39;, socket_obj, [&#39;header&#39;], &#39;body&#39;)``</span>
+<span class="sd">        :type testfunc:  generatorfunction</span>
+
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">item_gen</span> <span class="o">=</span> <span class="n">testfunc</span><span class="p">()</span>
+        <span class="n">item</span> <span class="o">=</span> <span class="nb">next</span><span class="p">(</span><span class="n">item_gen</span><span class="p">)</span>
+
+        <span class="k">def</span> <span class="nf">throw_err</span><span class="p">(</span><span class="n">skip_levels</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
+            <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">            Throws the last error to *item_gen* and skips *skip_levels* in</span>
+<span class="sd">            the traceback to point to the line that yielded the last event.</span>
+
+<span class="sd">            &quot;&quot;&quot;</span>
+            <span class="n">etype</span><span class="p">,</span> <span class="n">evalue</span><span class="p">,</span> <span class="n">tb</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">()</span>
+            <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">skip_levels</span><span class="p">):</span>
+                <span class="n">tb</span> <span class="o">=</span> <span class="n">tb</span><span class="o">.</span><span class="n">tb_next</span>
+            <span class="n">item_gen</span><span class="o">.</span><span class="n">throw</span><span class="p">(</span><span class="n">etype</span><span class="p">,</span> <span class="n">evalue</span><span class="p">,</span> <span class="n">tb</span><span class="p">)</span>
+
+        <span class="k">try</span><span class="p">:</span>
+            <span class="k">while</span> <span class="bp">True</span><span class="p">:</span>
+                <span class="k">try</span><span class="p">:</span>
+                    <span class="c"># Call the event handler and pass the args,</span>
+                    <span class="c"># e.g., self.send(socket_obj, header, body)</span>
+                    <span class="n">ret</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">[</span><span class="mi">0</span><span class="p">])(</span><span class="o">*</span><span class="n">item</span><span class="p">[</span><span class="mi">1</span><span class="p">:])</span>
+
+                    <span class="c"># Send the results back to the test and get the next item</span>
+                    <span class="n">item</span> <span class="o">=</span> <span class="n">item_gen</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">ret</span><span class="p">)</span>
+
+                <span class="k">except</span> <span class="n">zmq</span><span class="o">.</span><span class="n">ZMQError</span><span class="p">:</span>
+                    <span class="n">throw_err</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span>  <span class="c"># PyZMQ could not send/recv</span>
+                <span class="k">except</span> <span class="ne">AssertionError</span><span class="p">:</span>
+                    <span class="n">throw_err</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>  <span class="c"># Error in the test</span>
+        <span class="k">except</span> <span class="ne">StopIteration</span><span class="p">:</span>
+            <span class="k">pass</span>
+</pre></div>
+<p>The <em>run</em> method simply iterates over all events our <em>testfunc</em> generates and
+calls a method with the name of the event (e.g., <em>send</em> or <em>recv</em>). Their
+return value is sent back into the generator. If an error occurs, the
+exception’s traceback is modified to point to the line of code that yielded the
+according event and not to the <em>run</em> method itself.</p>
+<p>The methods <em>send</em> and <em>recv</em> roughly do the same as the snippet I showed you
+above:</p>
+<div class="highlight"><pre><span class="c"># test/support.py</span>
+
+    <span class="k">def</span> <span class="nf">send</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">socket</span><span class="p">,</span> <span class="n">header</span><span class="p">,</span> <span class="n">body</span><span class="p">,</span> <span class="n">extra_data</span><span class="o">=</span><span class="p">[]):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        JSON-encodes *body*, concatenates it with *header*, appends</span>
+<span class="sd">        *extra_data* and sends it as multipart message over *socket*.</span>
+
+<span class="sd">        *header* and *extra_data* should be lists containg byte objects or</span>
+<span class="sd">        objects implementing the buffer interface (like NumPy arrays).</span>
+
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">socket</span><span class="o">.</span><span class="n">send_multipart</span><span class="p">(</span><span class="n">header</span> <span class="o">+</span> <span class="p">[</span><span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">body</span><span class="p">)]</span> <span class="o">+</span> <span class="n">extra_data</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">recv</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">socket</span><span class="p">,</span> <span class="n">json_load_index</span><span class="o">=-</span><span class="mi">1</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Receives and returns a multipart message from *socket* and tries to</span>
+<span class="sd">        JSON-decode the item at position *json_load_index* (defaults to ``-1``;</span>
+<span class="sd">        the last element in the list). The original byte string will be</span>
+<span class="sd">        replaced by the loaded object. Set *json_load_index* to ``None`` to get</span>
+<span class="sd">        the original, unchanged message.</span>
+
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">msg</span> <span class="o">=</span> <span class="n">socket</span><span class="o">.</span><span class="n">recv_multipart</span><span class="p">()</span>
+        <span class="k">if</span> <span class="n">json_load_index</span> <span class="ow">is</span> <span class="ow">not</span> <span class="bp">None</span><span class="p">:</span>
+            <span class="n">msg</span><span class="p">[</span><span class="n">json_load_index</span><span class="p">]</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">msg</span><span class="p">[</span><span class="n">json_load_index</span><span class="p">])</span>
+        <span class="k">return</span> <span class="n">msg</span>
+</pre></div>
+<p>You can even add your own event handler to your test class. I used this, for
+example, to add a <em>log</em> event that checks if a PyZMQ log handler sent the
+expected log messages:</p>
+<div class="highlight"><pre><span class="k">def</span> <span class="nf">log</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">substr</span><span class="o">=</span><span class="s">&#39;&#39;</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Receives a message and asserts, that it is a log message and that</span>
+<span class="sd">    *substr* is in that message.</span>
+
+<span class="sd">    Usage:</span>
+<span class="sd">        yield (&#39;log&#39;, &#39;Ai iz in ur log mesage&#39;)</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="n">msg</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">log_sock</span><span class="o">.</span><span class="n">recv_json</span><span class="p">()</span>
+    <span class="k">assert</span> <span class="n">msg</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="s">&#39;log_message&#39;</span>
+    <span class="k">assert</span> <span class="n">substr</span> <span class="ow">in</span> <span class="n">msg</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
+</pre></div>
+<div class="section" id="what-if-your-process-starts-further-subprocesses">
+<h2>What if your process starts further subprocesses?</h2>
+<p>In some cases, the process you are about to test starts additional subprocesses
+that you don’t want to test. Even worse, these processes might communicate via
+sockets bound to random ports. And EVEN WORSE, the process you are testing
+might depend on excepting a <em>KeyboardInterrupt</em> to send stop messages to child
+processes or to clean something up!</p>
+<p>The last problem is quite easy to solve: You just a send a <em>SIGINT</em> to your
+process from the test:</p>
+<div class="highlight"><pre><span class="kn">import</span> <span class="nn">os</span><span class="o">,</span> <span class="nn">signal</span>
+
+<span class="k">def</span> <span class="nf">teardown_method</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">):</span>
+    <span class="n">os</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">my_proc</span><span class="o">.</span><span class="n">pid</span><span class="p">,</span> <span class="n">signal</span><span class="o">.</span><span class="n">SIGINT</span><span class="p">)</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">my_proc</span><span class="o">.</span><span class="n">join</span><span class="p">()</span>
+
+    <span class="c"># Now you can close the test sockets</span>
+</pre></div>
+<p>If you don’t want to start a certain subprocess, you can just mock it.
+Imagine, you have two processes <tt class="docutils literal">a.A</tt> and <tt class="docutils literal">b.B</tt>, where <em>A</em> starts <em>B</em>,
+then you just mock <em>B</em> before starting <em>A</em>:</p>
+<div class="highlight"><pre><span class="k">with</span> <span class="n">mock</span><span class="o">.</span><span class="n">patch</span><span class="p">(</span><span class="s">&#39;b.B&#39;</span><span class="p">):</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">a</span> <span class="o">=</span> <span class="n">A</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">a</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
+</pre></div>
+<p>Imagine now, that <em>A</em> binds a socket to a random port and uses that socket to
+communicate with <em>B</em>. If you want to mock <em>B</em> in your tests, you need that port
+number in order to connect to it and send messages to <em>A</em>.</p>
+<p>But how can you get that number? When <em>A</em> creates <em>B</em>, it already runs in its
+own process, so a simple attribute access won’t work. Setting a random seed
+would only work if you did that directly in <em>A</em> when it’s already running. But
+doing that just for the tests is not such a good idea. It also may not work
+reliably on all systems and Python versions.</p>
+<p>However, <em>A</em> must pass the socket number to <em>B</em>, so that <em>B</em> can connect to
+<em>A</em>. Thus, we can create a mock for <em>B</em> that will send us its port number via a
+<cite>queue &lt;http://docs.python.org/py3k/library/multiprocessing#exchanging-objects-
+between-processes&gt;</cite>:</p>
+<div class="highlight"><pre><span class="k">class</span> <span class="nc">ProcMock</span><span class="p">(</span><span class="n">mock</span><span class="o">.</span><span class="n">Mock</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    This mock returns itself when called, so it acts like both, the</span>
+<span class="sd">    process’ class and instance object.</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">__init__</span><span class="p">()</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Queue</span><span class="p">()</span>
+
+    <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">port</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;Will be called when A instantiates B and passes its port number.&quot;&quot;&quot;</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">port</span><span class="p">)</span>
+        <span class="k">return</span> <span class="bp">self</span>
+
+    <span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span>  <span class="c"># Just make sure the methods exists and returns nothing</span>
+
+    <span class="k">def</span> <span class="nf">join</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span>  <span class="c"># Just make sure the methods exists and returns nothing</span>
+
+
+<span class="k">class</span> <span class="nc">TestA</span><span class="p">(</span><span class="n">ProcessTest</span><span class="p">):</span>
+
+    <span class="k">def</span> <span class="nf">setup_method</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+
+        <span class="n">b_mock</span> <span class="o">=</span> <span class="n">ProcMock</span><span class="p">()</span>
+        <span class="k">with</span> <span class="n">mock</span><span class="o">.</span><span class="n">patch</span><span class="p">(</span><span class="s">&#39;b.B&#39;</span><span class="p">,</span> <span class="n">new</span><span class="o">=</span><span class="n">b_mock</span><span class="p">):</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">a</span> <span class="o">=</span> <span class="n">A</span><span class="p">()</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">a</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
+
+        <span class="c"># Get the port A is listening on</span>
+        <span class="n">port</span> <span class="o">=</span> <span class="n">b_mock</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
+
+        <span class="c"># ...</span>
+</pre></div>
+<p>As you’ve seen, process testing is really not as simple as unit testing. But I
+always found bugs with it that my unit tests coudn’t detect. If you cover all
+communication sequences for a process in a process test, you can be pretty
+sure, that it will also work flawlessly in the final application.</p>
+</div>
+</div>
+<div class="section" id="system-testing">
+<h1>System Testing</h1>
+<p>If your application consists of more than one process, you still need to test
+whether all processes work nicely together or not. This is something you cannot
+simulate reliably with a process tests, as much as unit tests can’t replace the
+process test.</p>
+<p>Writing a good system test is very application-specific and can, depending on
+the complexity of your application, be very hard or very easy. Fortunately, the
+latter is the case for our ping-pong app. We just start it and copy its output
+to a file. If the output is not what we expected, we modify the file
+accordingly. In our test, we can now simply invoke our programm again, capture
+its output and compare it to the contents of the file we created before:</p>
+<div class="highlight"><pre><span class="c"># test/system/test_pongproc.py</span>
+<span class="kn">import</span> <span class="nn">os.path</span>
+<span class="kn">import</span> <span class="nn">subprocess</span>
+
+<span class="kn">import</span> <span class="nn">pytest</span>
+
+
+<span class="k">def</span> <span class="nf">test_pongproc</span><span class="p">():</span>
+    <span class="n">filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="s">&#39;test&#39;</span><span class="p">,</span> <span class="s">&#39;data&#39;</span><span class="p">,</span> <span class="s">&#39;pongproc.out&#39;</span><span class="p">)</span>
+    <span class="n">expected</span> <span class="o">=</span> <span class="nb">open</span><span class="p">(</span><span class="n">filename</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
+
+    <span class="n">output</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">check_output</span><span class="p">([</span><span class="s">&#39;python&#39;</span><span class="p">,</span> <span class="s">&#39;pongproc.py&#39;</span><span class="p">],</span>
+                                     <span class="n">universal_newlines</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+
+    <span class="k">assert</span> <span class="n">output</span> <span class="o">==</span> <span class="n">expected</span>
+</pre></div>
+<p>If your application was a server, another way of doing the system test would be
+to emulate a client that speaks with it. Your system test would then be very
+similar to your process tests, except that you only mimic the client and not
+all processes your main process communicates with.</p>
+<p>Other applications (like, for instance, simulations) might create a database
+containing collected data. Here, you might check if these results match your
+expectations.</p>
+<p>Of course you can also combine these possibilities or do something completely
+different …</p>
+</div>
+<div class="section" id="my-test-are-now-running-sooo-slow">
+<h1>“My Test Are now Running sooo Slow!”</h1>
+<p>System and process tests often run much slower than simple unit tests, so you
+may want to skip them most of the time. Pytest allows you to <a class="reference external" href="http://pytest.org/latest/example/markers.html#mark-examples">mark</a> a test with a
+given name. You can then (de)select tests based on their mark when you invoke
+pytest.</p>
+<p>To mark a module e.g. as <tt class="docutils literal">process</tt> test, just put a line <tt class="docutils literal">pytestmark =
+pytest.mark.process</tt> somewhere in it. Likewise, you can add a <tt class="docutils literal">pytestmark =
+pytest.mark.system</tt> to mark a module as system test.</p>
+<p>You can now deselect process and system tests:</p>
+<div class="highlight"><pre><span class="nv">$ </span>py.test -m <span class="s2">&quot;not (process or system)&quot;</span>
+</pre></div>
+<p>You can put this into a <em>pytest.ini</em> as a default setting. To override this
+again, use <tt class="docutils literal">1</tt> or <tt class="docutils literal">True</tt> as selection expression:</p>
+<div class="highlight"><pre><span class="nv">$ </span>py.test -m 1
+</pre></div>
+</div>
+<div class="section" id="summary">
+<h1>Summary</h1>
+<p>Process and system testing were the last two topics I wanted to cover in this
+series. Compared to simple unit tests, they require a bit more effort. I think
+they are definitely worth the extra work since they give you a lot more
+confidence that your program actually works, because they are much more
+realistic than unit tests can be.</p>
+<p>In the end, these articles became much longer and more elaborate then I
+originally planned them to be. However, I hope they provided a good overview
+about how to design and test applications with PyZMQ and I hope that you now
+run into much less problems than I did when I first started working with PyZMQ.</p>
+</div>
+</div>
+</body>
+</html>

File article_3.txt

+Designing and Testing PyZMQ Applications – Part 3
+=================================================
+
+The third and last part of this `series <http://stefan.sofa-
+rockers.org/2012/02/01 /designing-and-testing-pyzmq-applications-part-1/>`_ is
+again just about testing. While the `previous article <http://stefan.sofa-
+rockers.org/2012/02/07/designing-and-testing-pyzmq-applications-part-2/>`_
+focused on unit testing, this one will be about testing complete PyZMQ
+processes. This even involves some `magic <http://bit.ly/g3wTBC>`_!
+
+Once you’ve made sure that your message dispatching and application logic works
+fine, you can actually start sending real messages to your process and checking
+real replies. This can be done for single processes—I call this *process
+testing*—and for your complete application (*system testing*).
+
+When you test a single process, you create sockets that mimic all processes the
+tested process communicates with. When you do a system test, you only mimic a
+client or just invoke your program from the command line and check its output
+(e.g., what it prints to *stdtout* and *stderr* or results written to a
+database).
+
+I’ll start with process testing, which is a bit more generalizable than system
+testing.
+
+
+Process Testing
+---------------
+
+The biggest problem I ran into when I started testing processes was that I
+often made blocking calls to *recv* methods and these halted my tests and gave
+me no output about what actually went wrong. Though you can make them non-
+blocking by passing ``zmq.NOBLOCK`` as an extra argument, this doesn’t solve
+your problems. You will now need a very precise timing and many
+``time.sleep(x)`` calls, because *recv* will instantly raise an error if there
+is nothing to be received.
+
+My solution for this was to wrap PyZMQ sockets and add a timeout to its *send*
+and *recv* methods. The following wrapper will try to receive something for
+one second and raise an exception if that failed. There’s also `a simple
+wrapper <https://bitbucket.org/ssc/pyzmq-
+article/src/tip/test/support.py#cl-27>`_ for methods like *connect* or *bind*,
+but it’s really not that interesting, so I’ll omit it here.
+
+.. code-block:: python
+
+    # test/support.py
+
+    def get_wrapped_fwd(func):
+        """
+        Returns a wrapper, that tries to call *func* multiple time in non-blocking
+        mode before rasing an :class:`zmq.ZMQError`.
+
+        """
+        def forwarder(*args, **kwargs):
+            # 100 tries * 0.01 second == 1 second
+            for i in range(100):
+                try:
+                    rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
+                    return rep
+
+                except zmq.ZMQError:
+                    time.sleep(0.01)
+
+            # We should not get here, so raise an error.
+            msg = 'Could not %s message.' % func.__name__[:4]
+            raise zmq.ZMQError(msg)
+
+        return forwarder
+
+
+This wrapper is now used to create a *TestSocket* class with the desired
+behavior:
+
+.. code-block:: python
+
+    # test/support.py
+
+    class TestSocket(object):
+        """
+        Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
+        will be called multiple times in non-blocking mode before a
+        :class:`zmq.ZMQError` is raised.
+
+        """
+        def __init__(self, context, sock_type):
+            self._context = context
+
+            sock = context.socket(sock_type)
+            self._sock = sock
+
+            forwards = [  # These methods can simply be forwarded
+                sock.bind,
+                sock.bind_to_random_port,
+                sock.connect,
+                sock.close,
+                sock.setsockopt,
+            ]
+            wrapped_fwd = [  # These methods are wrapped with a for loop
+                sock.recv,
+                sock.recv_json,
+                sock.recv_multipart,
+                sock.recv_unicode,
+                sock.send,
+                sock.send_json,
+                sock.send_multipart,
+                sock.send_unicode,
+            ]
+
+            for func in forwards:
+                setattr(self, func.__name__, get_forwarder(func))
+
+            for func in wrapped_fwd:
+                setattr(self, func.__name__, get_wrapped_fwd(func))
+
+
+In order to reuse the same ports for all test methods, you need to cleanly
+close all sockets after each test. To handle method level setup/teardown in
+pytest, you need to implement a *setup_method* and a *teardown_method*. In the
+setup method, you create one or more *TestSocket* instances that mimic other
+processes and you also start the process to be tested:
+
+.. code-block:: python
+
+    # test/process/test_pongproc.py
+    import pytest
+    import zmq
+
+    from test.support import ProcessTest, make_sock
+    import pongproc
+
+
+    host = '127.0.0.1'
+    port = 5678
+
+
+    class TestProngProc(ProcessTest):
+        """Communication test for the Platform Manager process."""
+
+        def setup_method(self, method):
+            """
+            Creates and starts a PongProc process and sets up sockets to
+            communicate with it.
+
+            """
+            self.context = zmq.Context()
+
+            # make_sock creates and connects a TestSocket that we will use to
+            # mimic the Ping process
+            self.req_sock = make_sock(self.context, zmq.REQ,
+                                      connect=(host, port))
+
+            self.pp = pongproc.PongProc((host, port))
+            self.pp.start()
+
+        def teardown_method(self, method):
+            """
+            Sends a kill message to the pp and waits for the process to terminate.
+
+            """
+            # Send a stop message to the prong process and wait until it joins
+            self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
+            self.pp.join()
+
+            self.req_sock.close()
+
+
+You may have noticed that our test class inherits *ProcessTests*. This class
+and some helpers in a `conftest.py <http://pytest.org/latest/plugins.html>`_
+allow us to use some magic that improves the readability of the actual test:
+
+.. code-block:: python
+
+    # test/process/test_pongproc.py
+
+        def test_ping(self):
+            """Tests a ping-pong sequence."""
+            yield ('send', self.req_sock, [], ['ping', 1])
+
+            reply = yield ('recv', self.req_sock)
+            assert reply == [['pong', 1]]
+
+You can just yield *send* or *recv* events from your test case! When you yield
+a *send*, the test machinery tries to send a message via the specified socket.
+When you yield a receive, *ProcessTest* tries to receive something from the
+socket and sends its result back to your test function, so that you can easily
+compare the reply with the expected result.
+
+The example above is roughly equivalent to the following code:
+
+.. code-block:: python
+
+    self.req_sock.send_multipart([] + [json.dumps(['ping', 1])])
+
+    reply = self.req_sock.recv_multipart()
+    reply[-1] = json.loads[reply[-1]]
+    assert reply == [['pong', 1]]
+
+
+So how does this work? By default, if pytests finds a test function that is a
+generator, it assumes that it generates further test functions. Hence, our
+first step is to override this behavior. We can do this in a ``conftest.py``
+file in the ``test/process/`` directory by implementing a
+*pytest_pycollect_makeitem* function. In this case, we collect generator
+functions like normal functions:
+
+.. code-block:: python
+
+    # test/process/conftest.py
+    from inspect import isfunction, isgeneratorfunction
+
+
+    def pytest_pycollect_makeitem(collector, name, obj):
+        """
+        Collects all instance methods that are generators and returns them as
+        normal function items.
+
+        """
+        if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
+            if isfunction(obj) or isgeneratorfunction(obj):
+                return collector._genfunctions(name, obj)
+
+
+Now, we need to tell pytest how to run a test on the collected generator
+functions. This can be done by implementing *pytest_runtest_call*. If the
+object we are going to test (``item.obj``) is a generator function, we call the
+*run* method  of the object’s instance (``item.obj.__self__.run``) and pass the
+generator function to it. If the test item contains a normal function, we run
+the default test.
+
+.. code-block:: python
+
+    # test/process/conftest.py
+
+    def pytest_runtest_call(item):
+        """
+        Passes the test generator (``item.obj``) to the ``run()`` method of the
+        generator's instance. This method should be inherited from
+        :class:`test.support.ProcessTest`.
+
+        """
+        if isgeneratorfunction(item.obj):
+            item.obj.__self__.run(item.obj)
+        else:  # Normal test execution for normal instance methods
+            item.runtest()
+
+
+But wait—we didn’t implement a *run* method in our test case! So it must be
+inherited from *ProcessTest*. Let’s take a look at it:
+
+.. code-block:: python
+
+    # test/support.py
+    class ProcessTest(object):
+        """
+        Base class for process tests. It offers basic actions for sending and
+        receiving messages and implements the *run* methods that handles the
+        actual test generators.
+
+        """
+
+        def run(self, testfunc):
+            """
+            Iterates over the *testfunc* generator and executes all actions it
+            yields. Results will be sent back into the generator.
+
+            :param testfunc: A generator function that yields tuples containing
+                    an action keyword, which should be a function of this or
+                    the inheriting class (like ``send`` or ``recv``) and additional
+                    parameters that will be passed to that function, e.g.:
+                    ``('send', socket_obj, ['header'], 'body')``
+            :type testfunc:  generatorfunction
+
+            """
+            item_gen = testfunc()
+            item = next(item_gen)
+
+            def throw_err(skip_levels=0):
+                """
+                Throws the last error to *item_gen* and skips *skip_levels* in
+                the traceback to point to the line that yielded the last event.
+
+                """
+                etype, evalue, tb = sys.exc_info()
+                for i in range(skip_levels):
+                    tb = tb.tb_next
+                item_gen.throw(etype, evalue, tb)
+
+            try:
+                while True:
+                    try:
+                        # Call the event handler and pass the args,
+                        # e.g., self.send(socket_obj, header, body)
+                        ret = getattr(self, item[0])(*item[1:])
+
+                        # Send the results back to the test and get the next item
+                        item = item_gen.send(ret)
+
+                    except zmq.ZMQError:
+                        throw_err(3)  # PyZMQ could not send/recv
+                    except AssertionError:
+                        throw_err(1)  # Error in the test
+            except StopIteration:
+                pass
+
+
+The *run* method simply iterates over all events our *testfunc* generates and
+calls a method with the name of the event (e.g., *send* or *recv*). Their
+return value is sent back into the generator. If an error occurs, the
+exception’s traceback is modified to point to the line of code that yielded the
+according event and not to the *run* method itself.
+
+The methods *send* and *recv* roughly do the same as the snippet I showed you
+above:
+
+.. code-block:: python
+
+    # test/support.py
+
+        def send(self, socket, header, body, extra_data=[]):
+            """
+            JSON-encodes *body*, concatenates it with *header*, appends
+            *extra_data* and sends it as multipart message over *socket*.
+
+            *header* and *extra_data* should be lists containg byte objects or
+            objects implementing the buffer interface (like NumPy arrays).
+
+            """
+            socket.send_multipart(header + [json.dumps(body)] + extra_data)
+
+        def recv(self, socket, json_load_index=-1):
+            """
+            Receives and returns a multipart message from *socket* and tries to
+            JSON-decode the item at position *json_load_index* (defaults to ``-1``;
+            the last element in the list). The original byte string will be
+            replaced by the loaded object. Set *json_load_index* to ``None`` to get
+            the original, unchanged message.
+
+            """
+            msg = socket.recv_multipart()
+            if json_load_index is not None:
+                msg[json_load_index] = json.loads(msg[json_load_index])
+            return msg
+
+
+You can even add your own event handler to your test class. I used this, for
+example, to add a *log* event that checks if a PyZMQ log handler sent the
+expected log messages:
+
+.. code-block:: python
+
+    def log(self, substr=''):
+        """
+        Receives a message and asserts, that it is a log message and that
+        *substr* is in that message.
+
+        Usage:
+            yield ('log', 'Ai iz in ur log mesage')
+
+        """
+        msg = self.log_sock.recv_json()
+        assert msg[0] == 'log_message'
+        assert substr in msg[1]
+
+
+What if your process starts further subprocesses?
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+In some cases, the process you are about to test starts additional subprocesses
+that you don’t want to test. Even worse, these processes might communicate via
+sockets bound to random ports. And EVEN WORSE, the process you are testing
+might depend on excepting a *KeyboardInterrupt* to send stop messages to child
+processes or to clean something up!
+
+The last problem is quite easy to solve: You just a send a *SIGINT* to your
+process from the test:
+
+.. code-block:: python
+
+    import os, signal
+
+    def teardown_method(self, method):
+        os.kill(self.my_proc.pid, signal.SIGINT)
+        self.my_proc.join()
+
+        # Now you can close the test sockets
+
+
+If you don’t want to start a certain subprocess, you can just mock it.
+Imagine, you have two processes ``a.A`` and ``b.B``, where *A* starts *B*,
+then you just mock *B* before starting *A*:
+
+.. code-block:: python
+
+    with mock.patch('b.B'):
+        self.a = A()
+        self.a.start()
+
+
+Imagine now, that *A* binds a socket to a random port and uses that socket to
+communicate with *B*. If you want to mock *B* in your tests, you need that port
+number in order to connect to it and send messages to *A*.
+
+But how can you get that number? When *A* creates *B*, it already runs in its
+own process, so a simple attribute access won’t work. Setting a random seed
+would only work if you did that directly in *A* when it’s already running. But
+doing that just for the tests is not such a good idea. It also may not work
+reliably on all systems and Python versions.
+
+However, *A* must pass the socket number to *B*, so that *B* can connect to
+*A*. Thus, we can create a mock for *B* that will send us its port number via a
+`queue <http://docs.python.org/py3k/library/multiprocessing#exchanging-objects-
+between-processes>`:
+
+.. code-block:: python
+
+    class ProcMock(mock.Mock):
+        """
+        This mock returns itself when called, so it acts like both, the
+        process’ class and instance object.
+
+        """
+        def __init__(self):
+            super().__init__()
+            self.queue = multiprocessing.Queue()
+
+        def __call__(self, port):
+            """Will be called when A instantiates B and passes its port number."""
+            self.queue.put(port)
+            return self
+
+        def start(self):
+            return  # Just make sure the methods exists and returns nothing
+
+        def join(self):
+            return  # Just make sure the methods exists and returns nothing
+
+
+    class TestA(ProcessTest):
+
+        def setup_method(self):
+
+            b_mock = ProcMock()
+            with mock.patch('b.B', new=b_mock):
+                self.a = A()
+                self.a.start()
+
+            # Get the port A is listening on
+            port = b_mock.queue.get()
+
+            # ...
+
+
+As you’ve seen, process testing is really not as simple as unit testing. But I
+always found bugs with it that my unit tests coudn’t detect. If you cover all
+communication sequences for a process in a process test, you can be pretty
+sure, that it will also work flawlessly in the final application.
+
+
+System Testing
+---------------
+
+If your application consists of more than one process, you still need to test
+whether all processes work nicely together or not. This is something you cannot
+simulate reliably with a process tests, as much as unit tests can’t replace the
+process test.
+
+Writing a good system test is very application-specific and can, depending on
+the complexity of your application, be very hard or very easy. Fortunately, the
+latter is the case for our ping-pong app. We just start it and copy its output
+to a file. If the output is not what we expected, we modify the file
+accordingly. In our test, we can now simply invoke our programm again, capture
+its output and compare it to the contents of the file we created before:
+
+.. code-block:: python
+
+    # test/system/test_pongproc.py
+    import os.path
+    import subprocess
+
+    import pytest
+
+
+    def test_pongproc():
+        filename = os.path.join('test', 'data', 'pongproc.out')
+        expected = open(filename).read()
+
+        output = subprocess.check_output(['python', 'pongproc.py'],
+                                         universal_newlines=True)
+
+        assert output == expected
+
+
+If your application was a server, another way of doing the system test would be
+to emulate a client that speaks with it. Your system test would then be very
+similar to your process tests, except that you only mimic the client and not
+all processes your main process communicates with.
+
+Other applications (like, for instance, simulations) might create a database
+containing collected data. Here, you might check if these results match your
+expectations.
+
+Of course you can also combine these possibilities or do something completely
+different …
+
+
+“My Test Are now Running sooo Slow!”
+------------------------------------
+
+System and process tests often run much slower than simple unit tests, so you
+may want to skip them most of the time. Pytest allows you to `mark
+<http://pytest.org/latest/example/markers.html#mark-examples>`_ a test with a
+given name. You can then (de)select tests based on their mark when you invoke
+pytest.
+
+To mark a module e.g. as ``process`` test, just put a line ``pytestmark =
+pytest.mark.process`` somewhere in it. Likewise, you can add a ``pytestmark =
+pytest.mark.system`` to mark a module as system test.
+
+You can now deselect process and system tests:
+
+.. code-block:: bash
+
+    $ py.test -m "not (process or system)"
+
+You can put this into a *pytest.ini* as a default setting. To override this
+again, use ``1`` or ``True`` as selection expression:
+
+.. code-block:: bash
+
+    $ py.test -m 1
+
+
+Summary
+-------
+
+Process and system testing were the last two topics I wanted to cover in this
+series. Compared to simple unit tests, they require a bit more effort. I think
+they are definitely worth the extra work since they give you a lot more
+confidence that your program actually works, because they are much more
+realistic than unit tests can be.
+
+In the end, these articles became much longer and more elaborate then I
+originally planned them to be. However, I hope they provided a good overview
+about how to design and test applications with PyZMQ and I hope that you now
+run into much less problems than I did when I first started working with PyZMQ.
         return ['pong', num_pings]
 
 
-
 if __name__ == '__main__':
     pong_proc = PongProc(bind_addr=(host, port))
     pong_proc.start()

File test/data/pongproc.out

+Pong got request number 0
+Pong got request number 1
+Pong got request number 2
+Pong got request number 3
+Pong got request number 4
+Ping got reply: ['pong', 0]
+Ping got reply: ['pong', 1]
+Ping got reply: ['pong', 2]
+Ping got reply: ['pong', 3]
+Ping got reply: ['pong', 4]

File test/data/pongproc.txt

Empty file added.

File test/process/__init__.py

Empty file added.

File test/process/conftest.py

+from inspect import isfunction, isgeneratorfunction
+
+
+def pytest_pycollect_makeitem(collector, name, obj):
+    """
+    Collects all instance methods that are generators and returns them as
+    normal function items.
+
+    """
+    if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
+        if isfunction(obj) or isgeneratorfunction(obj):
+            return collector._genfunctions(name, obj)
+
+
+def pytest_runtest_call(item):
+    """
+    Passes the test generator (``item.obj``) to the ``run()`` method of the
+    generator's instance. This method should be inherited from
+    :class:`test.support.ProcessTest`.
+
+    """
+    if isgeneratorfunction(item.obj):
+        item.obj.__self__.run(item.obj)
+    else:
+        item.runtest()

File test/process/test_pongproc.py

+import pytest
+import zmq
+
+from test.support import ProcessTest, make_sock
+import pongproc
+
+
+pytestmark = pytest.mark.process
+
+host = '127.0.0.1'
+port = 5678
+
+
+class TestProngProc(ProcessTest):
+    """Communication test for the Platform Manager process."""
+
+    def setup_method(self, method):
+        """
+        Creates and starts a pp process and sets up sockets to communicate
+        with it.
+
+        """
+        self.context = zmq.Context()
+
+        # Mimics the ping process
+        self.req_sock = make_sock(self.context, zmq.REQ,
+                                  connect=(host, port))
+
+        self.pp = pongproc.PongProc((host, port))
+        self.pp.start()
+
+    def teardown_method(self, method):
+        """
+        Sends a kill message to the pp and waits for the process to terminate.
+
+        """
+        # Send a stop message to the prong process and wait until it joins
+        self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
+        self.pp.join()
+
+        # Assert that no more messages are in the pipe
+        pytest.raises(zmq.ZMQError, self.req_sock.recv_multipart)
+
+        self.req_sock.close()
+
+    def test_ping(self):
+        """Tests a ping-pong sequence."""
+        yield ('send', self.req_sock, [], ['ping', 1])
+
+        reply = yield ('recv', self.req_sock)
+        assert reply == [['pong', 1]]

File test/support.py

+"""
+This module contains various functions and classes to support testing.
+
+"""
+import time
+import sys
+
+from zmq.utils import jsonapi as json
+import zmq
+
+
+def make_sock(context, sock_type, bind=None, connect=None):
+    """
+    Creates a *sock_type* typed socket and binds or connects it to the given
+    address.
+
+    """
+    sock = TestSocket(context, sock_type)
+    if bind:
+        sock.bind('tcp://%s:%s' % bind)
+    elif connect:
+        sock.connect('tcp://%s:%s' % connect)
+
+    return sock
+
+
+def get_forwarder(func):
+    """Returns a simple wrapper for *func*."""
+    def forwarder(*args, **kwargs):
+        return func(*args, **kwargs)
+
+    return forwarder
+
+
+def get_wrapped_fwd(func):
+    """
+    Returns a wrapper, that tries to call *func* multiple time in non-blocking
+    mode before rasing an :class:`zmq.ZMQError`.
+
+    """
+    def forwarder(*args, **kwargs):
+        for i in range(100):
+            try:
+                rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
+                return rep
+
+            except zmq.ZMQError:
+                time.sleep(0.01)
+
+        msg = 'Could not %s message.' % func.__name__[:4]
+        raise zmq.ZMQError(msg)
+
+    return forwarder
+
+
+class TestSocket(object):
+    """
+    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
+    will be called multiple times in non-blocking mode before a
+    :class:`zmq.ZMQError` is raised.
+
+    """
+    def __init__(self, context, sock_type):
+        self._context = context
+
+        sock = context.socket(sock_type)
+        self._sock = sock
+
+        forwards = [  # These methods can simply be forwarded
+            sock.bind,
+            sock.bind_to_random_port,
+            sock.connect,
+            sock.close,
+            sock.setsockopt,
+        ]
+        wrapped_fwd = [  # These methods are wrapped with a for loop
+            sock.recv,
+            sock.recv_json,
+            sock.recv_multipart,
+            sock.recv_unicode,
+            sock.send,
+            sock.send_json,
+            sock.send_multipart,
+            sock.send_unicode,
+        ]
+
+        for func in forwards:
+            setattr(self, func.__name__, get_forwarder(func))
+
+        for func in wrapped_fwd:
+            setattr(self, func.__name__, get_wrapped_fwd(func))
+
+
+class ProcessTest(object):
+    """
+    Base class for process tests. It offers basic actions for sending and
+    receiving messages and implements the *run* methods that handles the
+    actual test generators.
+
+    """
+    def send(self, socket, header, body, extra_data=[]):
+        """
+        JSON-encodes *body*, concatenates it with *header*, appends
+        *extra_data* and sends it as multipart message over *socket*.
+
+        *header* and *extra_data* should be lists containg byte objects or
+        objects implementing the buffer interface (like NumPy arrays).
+
+        """
+        socket.send_multipart(header + [json.dumps(body)] + extra_data)
+
+    def recv(self, socket, json_load_index=-1):
+        """
+        Receives and returns a multipart message from *socket* and tries to
+        JSON-decode the item at position *json_load_index* (defaults to ``-1``;
+        the last element in the list). The original byte string will be
+        replaced by the loaded object. Set *json_load_index* to ``None`` to get
+        the original, unchanged message.
+
+        """
+        msg = socket.recv_multipart()
+        if json_load_index is not None:
+            msg[json_load_index] = json.loads(msg[json_load_index])
+        return msg
+
+    def run(self, testfunc):
+        """
+        Iterates over the *testfunc* generator and executes all actions it
+        yields. Results will be sent back into the generator.
+
+        :param testfunc: A generator function that yields tuples containing
+                an action keyword, which should be a function of this or
+                the inheriting class (like ``send`` or ``recv``) and additional
+                parameters that will be passed to that function, e.g.:
+                ``('send', socket_obj, ['header'], 'body')``
+        :type testfunc:  generatorfunction
+
+        """
+        item_gen = testfunc()
+        item = next(item_gen)
+
+        def throw_err(skip_levels=0):
+            """
+            Throws the last error to *item_gen* and skips *skip_levels* in
+            the traceback to point to the line that yielded the last event.
+
+            """
+            etype, evalue, tb = sys.exc_info()
+            for i in range(skip_levels):
+                tb = tb.tb_next
+            item_gen.throw(etype, evalue, tb)
+
+        try:
+            while True:
+                try:
+                    ret = getattr(self, item[0])(*item[1:])
+                    item = item_gen.send(ret)
+
+                except zmq.ZMQError:
+                    throw_err(3)  # PyZMQ could not send/recv
+                except AssertionError:
+                    throw_err(1)  # Error in the test
+        except StopIteration:
+            pass

File test/system/__init__.py

Empty file added.

File test/system/test_pongproc.py

+import os.path
+import subprocess
+
+import pytest
+
+
+pytestmark = pytest.mark.system
+
+
+def test_pongproc():
+    filename = os.path.join('test', 'data', 'pongproc.out')
+    expected = open(filename).read()
+
+    output = subprocess.check_output(['python', 'pongproc.py'],
+                                     universal_newlines=True)
+
+    assert output == expected

File test/test_pongproc.py

 
     def test_stop(self, pp):
         pp.loop = mock.Mock()
-
         pp.stop()
-
         assert pp.loop.stop.call_count == 1
 
     @pytest.mark.parametrize(('handler', 'msg'), [
 
     def test_stop_msg(self, pp):
         pp.stop = mock.Mock()
-
         pp.handle_rep_stream([b'["plzdiekthxbye", null]'])
-
         assert pp.stop.call_count == 1
 
     def test_ping(self, pp):