Commits

Brandon Rhodes  committed 8a1041c

Updated to upstream util-linux-2.19, zeromq-2.1.4, and pyzmq-2.1.1.

  • Participants
  • Parent commits 2a63812

Comments (0)

Files changed (212)

 ^build$
 ^dist$
 \.egg-info$
-^pyzmq-2.0.8$
-^util-linux-ng-2.18$
+^pyzmq-2.1.1$
+^util-linux-2.19$
 ^tmp$
-^zeromq-2.0.9$
+^zeromq-2.1.4$
 #!/bin/bash
 
 # Die on error
+
 set -e
 
 # Re-fetch the sources necessary for building pyzmq-static.
+
 cd $(dirname "$0")
 mkdir -p tmp
 
-UTIL=util-linux-ng-2.18
-ZEROMQ=zeromq-2.0.10
-PYZMQ=pyzmq-2.0.8
+UTIL=util-linux-2.19
+ZEROMQ=zeromq-2.1.4
+PYZMQ=pyzmq-2.1.1
 
 # Download source distributions, or make sure they are up to date.
+
 cd tmp
-wget -c "http://www.kernel.org/pub/linux/utils/util-linux-ng/v2.18/$UTIL.tar.gz"
-wget -c "http://www.zeromq.org/local--files/area:download/$ZEROMQ.tar.gz"
-wget -c "http://pypi.python.org/packages/source/p/pyzmq/$PYZMQ.tar.gz#md5=b21ab6bc336c211c504068ecc55bc5cf"
+wget -c "http://www.kernel.org/pub/linux/utils/util-linux-ng/v2.19/$UTIL.tar.gz"
+wget -c "http://download.zeromq.org/$ZEROMQ.tar.gz"
+wget -c "http://pypi.python.org/packages/source/p/pyzmq/pyzmq-2.1.1.tar.gz#md5=f1d52b8bdf1f5f1e34b2c545da87a1e0"
 cd ..
 
 # Untar them.
+
 tar xvfz tmp/$UTIL.tar.gz
 tar xvfz tmp/$ZEROMQ.tar.gz
 tar xvfz tmp/$PYZMQ.tar.gz
 # Copy the files we need into our version-controlled directories.  (We
 # keep include_linux and include_darwin from one run to the next, since
 # we cannot replace their contents unless we are on that platform.)
+
 rm -rf include include_uuid licenses src src_nt src_uuid zmq
 mkdir  include include_uuid licenses src src_nt src_uuid zmq
 
    licenses
 
 cp $ZEROMQ/src/*.cpp \
-   $PYZMQ/zmq/_zmq.c \
    src
 cp $UTIL/shlibs/uuid/src/*.c \
    src_uuid
 
 cp $ZEROMQ/include/*.h* \
    $ZEROMQ/src/*.h* \
-   $PYZMQ/zmq/*.h \
+   $PYZMQ/zmq/utils/*.h \
    include
 
 mkdir include_uuid/uuid
 cp $UTIL/shlibs/uuid/src/*.h include_uuid      # where uuid expects it
 cp $UTIL/shlibs/uuid/src/*.h include_uuid/uuid # where ZeroMQ expects it
 
-cp -r $PYZMQ/zmq/*.py $PYZMQ/zmq/eventloop $PYZMQ/zmq/tests zmq
+(cd $PYZMQ/zmq; tar cf - *.py */*.py */*.c) | (cd zmq; tar xf -)
 
 cp $ZEROMQ/builds/msvc/platform.hpp include_nt
 
 cp $ZEROMQ/src/platform.hpp .
 
 # Remove the source trees.
+
 rm -rf $PYZMQ $UTIL $ZEROMQ

File include/allocate.h

+/*
+A utility to allocate a C array.
+
+This is excerpted from mpi4py's "atimport.h" and is licensed under the BSD license.
+*/
+
+#include "Python.h"
+
+static PyObject * allocate(Py_ssize_t n, void **pp){
+  PyObject *ob;
+  if (n > PY_SSIZE_T_MAX)
+    return PyErr_NoMemory();
+  else if (n < 0) {
+    PyErr_SetString(PyExc_RuntimeError,
+                    "memory allocation with negative size");
+    return NULL;
+  }
+#if PY_VERSION_HEX >= 0x02060000
+  ob = PyByteArray_FromStringAndSize(NULL, (n==0) ? 1 : n);
+  if (ob && n==0 && (PyByteArray_Resize(ob, 0) < 0)) {
+    Py_DECREF(ob);
+    return NULL;
+  }
+  if (ob && pp)
+    *pp = (void *)PyByteArray_AS_STRING(ob);
+#else
+  {
+    void *p = PyMem_Malloc(n);
+    if (!p)
+      return PyErr_NoMemory();
+    ob = PyCObject_FromVoidPtr(p, PyMem_Free);
+    if (!ob)
+      PyMem_Free(p);
+    else if (pp)
+      *pp = p;
+  }
+#endif
+  return ob;
+}

File include/app_thread.hpp

-/*
-    Copyright (c) 2007-2010 iMatix Corporation
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
-#define __ZMQ_APP_THREAD_HPP_INCLUDED__
-
-#include <vector>
-
-#include "stdint.hpp"
-#include "object.hpp"
-#include "yarray.hpp"
-#include "signaler.hpp"
-
-namespace zmq
-{
-
-    class app_thread_t : public object_t
-    {
-    public:
-
-        app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
-
-        ~app_thread_t ();
-
-        //  Interrupt blocking call if the app thread is stuck in one.
-        //  This function is is called from a different thread!
-        void stop ();
-
-        //  Returns signaler associated with this application thread.
-        signaler_t *get_signaler ();
-
-        //  Processes commands sent to this thread (if any). If 'block' is
-        //  set to true, returns only after at least one command was processed.
-        //  If throttle argument is true, commands are processed at most once
-        //  in a predefined time period. The function returns false is the
-        //  associated context was terminated, true otherwise.
-        bool process_commands (bool block_, bool throttle_);
-
-        //  Create a socket of a specified type.
-        class socket_base_t *create_socket (int type_);
-
-        //  Unregister the socket from the app_thread (called by socket itself).
-        void remove_socket (class socket_base_t *socket_);
-
-        //  Returns true is the associated context was already terminated.
-        bool is_terminated ();
-
-    private:
-
-        //  Command handlers.
-        void process_stop ();
-
-        //  All the sockets created from this application thread.
-        typedef yarray_t <socket_base_t> sockets_t;
-        sockets_t sockets;
-
-        //  App thread's signaler object.
-        signaler_t signaler;
-
-        //  Timestamp of when commands were processed the last time.
-        uint64_t last_processing_time;
-
-        //  If true, 'stop' command was already received.
-        bool terminated;
-
-        app_thread_t (const app_thread_t&);
-        void operator = (const app_thread_t&);
-    };
-
-}
-
-#endif

File include/array.hpp

+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ARRAY_INCLUDED__
+#define __ZMQ_ARRAY_INCLUDED__
+
+#include <vector>
+#include <algorithm>
+
+namespace zmq
+{
+
+    //  Base class for objects stored in the array. Note that each object can
+    //  be stored in at most one array.
+
+    class array_item_t
+    {
+    public:
+
+        inline array_item_t () :
+            array_index (-1)
+        {
+        }
+
+        //  The destructor doesn't have to be virtual. It is mad virtual
+        //  just to keep ICC and code checking tools from complaining.
+        inline virtual ~array_item_t ()
+        {
+        }
+
+        inline void set_array_index (int index_)
+        {
+            array_index = index_;
+        }
+
+        inline int get_array_index ()
+        {
+            return array_index;
+        }
+
+    private:
+
+        int array_index;
+
+        array_item_t (const array_item_t&);
+        const array_item_t &operator = (const array_item_t&);
+    };
+
+    //  Fast array implementation with O(1) access to item, insertion and
+    //  removal. Array stores pointers rather than objects. The objects have
+    //  to be derived from array_item_t class.
+
+    template <typename T> class array_t
+    {
+    public:
+
+        typedef typename std::vector <T*>::size_type size_type;
+
+        inline array_t ()
+        {
+        }
+
+        inline ~array_t ()
+        {
+        }
+
+        inline size_type size ()
+        {
+            return items.size ();
+        }
+
+        inline bool empty ()
+        {
+            return items.empty ();
+        }
+
+        inline T *&operator [] (size_type index_)
+        {
+            return items [index_];
+        }
+
+        inline void push_back (T *item_)
+        {
+            if (item_)
+                item_->set_array_index (items.size ());
+            items.push_back (item_);
+        }
+
+        inline void erase (T *item_) {
+            erase (item_->get_array_index ());
+        }
+
+        inline void erase (size_type index_) {
+            if (items.back ())
+                items.back ()->set_array_index (index_);
+            items [index_] = items.back ();
+            items.pop_back ();
+        }
+
+        inline void swap (size_type index1_, size_type index2_)
+        {
+            if (items [index1_])
+                items [index1_]->set_array_index (index2_);
+            if (items [index2_])
+                items [index2_]->set_array_index (index1_);
+            std::swap (items [index1_], items [index2_]);
+        }
+
+        inline void clear ()
+        {
+            items.clear ();
+        }
+
+        inline size_type index (T *item_)
+        {
+            return (size_type) item_->get_array_index ();
+        }
+
+    private:
+
+        typedef std::vector <T*> items_t;
+        items_t items;
+
+        array_t (const array_t&);
+        const array_t &operator = (const array_t&);
+    };
+
+}
+
+#endif

File include/atomic_counter.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #endif
 
         atomic_counter_t (const atomic_counter_t&);
-        void operator = (const atomic_counter_t&);
+        const atomic_counter_t& operator = (const atomic_counter_t&);
     };
 
 }

File include/atomic_ptr.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #endif
 
         atomic_ptr_t (const atomic_ptr_t&);
-        void operator = (const atomic_ptr_t&);
+        const atomic_ptr_t &operator = (const atomic_ptr_t&);
     };
 
 }

File include/blob.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 

File include/clock.hpp

+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_CLOCK_HPP_INCLUDED__
+#define __ZMQ_CLOCK_HPP_INCLUDED__
+
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+    class clock_t
+    {
+    public:
+
+        clock_t ();
+        ~clock_t ();
+
+        //  CPU's timestamp counter. Returns 0 if it's not available.
+        static uint64_t rdtsc ();
+
+        //  High precision timestamp.
+        static uint64_t now_us ();
+
+        //  Low precision timestamp. In tight loops generating it can be
+        //  10 to 100 times faster than the high precision timestamp.
+        uint64_t now_ms ();
+
+    private:
+
+        //  TSC timestamp of when last time measurement was made.
+        uint64_t last_tsc;
+
+        //  Physical time corresponding to the TSC above (in milliseconds).
+        uint64_t last_time;
+
+        clock_t (const clock_t&);
+        const clock_t &operator = (const clock_t&);
+    };
+
+}
+
+#endif

File include/command.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
             own,
             attach,
             bind,
-            revive,
-            reader_info,
+            activate_reader,
+            activate_writer,
             pipe_term,
             pipe_term_ack,
             term_req,
             term,
-            term_ack
+            term_ack,
+            reap,
+            reaped,
+            done
         } type;
 
         union {
 
             //  Sent to socket to let it know about the newly created object.
             struct {
-                class owned_t *object;
+                class own_t *object;
             } own;
 
-            //  Attach the engine to the session.
+            //  Attach the engine to the session. If engine is NULL, it informs
+            //  session that the connection have failed.
             struct {
                 struct i_engine *engine;
                 unsigned char peer_identity_size;
             //  Sent by pipe writer to inform dormant pipe reader that there
             //  are messages in the pipe.
             struct {
-            } revive;
+            } activate_reader;
 
-            //  Sent by pipe reader to inform pipe writer
-            //  about how many messages it has read so far.
-            //  Used to implement the flow control.
+            //  Sent by pipe reader to inform pipe writer about how many
+            //  messages it has read so far.
             struct {
                 uint64_t msgs_read;
-            } reader_info;
+            } activate_writer;
 
             //  Sent by pipe reader to pipe writer to ask it to terminate
             //  its end of the pipe.
             //  Sent by I/O object ot the socket to request the shutdown of
             //  the I/O object.
             struct {
-                class owned_t *object;
+                class own_t *object;
             } term_req;
 
             //  Sent by socket to I/O object to start its shutdown.
             struct {
+                int linger;
             } term;
 
             //  Sent by I/O object to the socket to acknowledge it has
             struct {
             } term_ack;
 
+            //  Transfers the ownership of the closed socket
+            //  to the reaper thread.
+            struct {
+                class socket_base_t *socket;
+            } reap;
+
+            //  Closed socket notifies the reaper that it's already deallocated.
+            struct {
+            } reaped;
+
+            //  Sent by reaper thread to the term thread when all the sockets
+            //  are successfully deallocated.
+            struct {
+            } done;
+
         } args;
     };
 

File include/config.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 
     enum 
     {
-        //  Maximal number of OS threads that can own 0MQ sockets
-        //  at the same time.
-        max_app_threads = 512,
+        //  Maximum number of sockets that can be opened at the same time.
+        max_sockets = 512,
 
         //  Number of new messages in message pipe needed to trigger new memory
         //  allocation. Setting this parameter to 256 decreases the impact of
         //  memory allocation by approximately 99.6%
         message_pipe_granularity = 256,
 
-        //  Number of signals that can be read by the signaler
-        //  using a single system call.
-        signal_buffer_size = 8,
-
         //  Determines how often does socket poll for new commands when it
         //  still has unprocessed messages to handle. Thus, if it is set to 100,
         //  socket will process 100 inbound messages before doing the poll.
         //  Maximal delta between high and low watermark.
         max_wm_delta = 1024,
 
+        //  Swap inteligently batches data for writing to disk. The size of
+        //  the batch in bytes is specified by this option.
+        swap_block_size = 8192,
+
         //  Maximum number of events the I/O thread can process in one go.
         max_io_events = 256,
 
-        //  Maximal wait time for a timer (milliseconds).
-        max_timer_period = 100,
-
         //  Maximal delay to process command in API thread (in CPU ticks).
         //  3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
+        //  Note that delay is only applied when there is continuous stream of
+        //  messages to process. If not so, commands are processed immediately.
         max_command_delay = 3000000,
 
-        //  Maximal number of non-accepted connections that can be held by
-        //  TCP listener object.
-        tcp_connection_backlog = 10,
+        //  Low-precision clock precision in CPU ticks. 1ms. Value of 1000000
+        //  should be OK for CPU frequencies above 1GHz. If should work
+        //  reasonably well for CPU frequencies above 500MHz. For lower CPU
+        //  frequencies you may consider lowering this value to get best
+        //  possible latencies.
+        clock_precision = 1000000,
 
         //  Maximum transport data unit size for PGM (TPDU).
         pgm_max_tpdu = 1500

File include/connect_session.hpp

+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+
+#include <string>
+
+#include "session.hpp"
+
+namespace zmq
+{
+
+    //  Connect session contains an address to connect to. On disconnect it
+    //  attempts to reconnect.
+
+    class connect_session_t : public session_t
+    {
+    public:
+
+        connect_session_t (class io_thread_t *io_thread_,
+            class socket_base_t *socket_, const options_t &options_,
+            const char *protocol_, const char *address_);
+        ~connect_session_t ();
+
+    private:
+
+        //  Handlers for events from session base class.
+        void attached (const blob_t &peer_identity_);
+        void detached ();
+
+        //  Start the connection process.
+        void start_connecting (bool wait_);
+
+        //  Command handlers.
+        void process_plug ();
+
+        //  Address to connect to.
+        std::string protocol;
+        std::string address;
+
+        connect_session_t (const connect_session_t&);
+        const connect_session_t &operator = (const connect_session_t&);
+    };
+
+}
+
+#endif

File include/ctx.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #ifndef __ZMQ_CTX_HPP_INCLUDED__
 #define __ZMQ_CTX_HPP_INCLUDED__
 
+#include <map>
 #include <vector>
-#include <set>
-#include <map>
 #include <string>
+#include <stdarg.h>
 
-#include "signaler.hpp"
+#include "../include/zmq.h"
+
+#include "mailbox.hpp"
+#include "semaphore.hpp"
 #include "ypipe.hpp"
+#include "array.hpp"
 #include "config.hpp"
 #include "mutex.hpp"
 #include "stdint.hpp"
 #include "thread.hpp"
+#include "options.hpp"
 
 namespace zmq
 {
+    //  Information associated with inproc endpoint. Note that endpoint options
+    //  are registered as well so that the peer can access them without a need
+    //  for synchronisation, handshaking or similar.
+    struct endpoint_t
+    {
+        class socket_base_t *socket;
+        options_t options;
+    };
 
     //  Context object encapsulates all the global state associated with
     //  the library.
         //  no more sockets open it'll cause all the infrastructure to be shut
         //  down. If there are open sockets still, the deallocation happens
         //  after the last one is closed.
-        int term ();
+        int terminate ();
 
-        //  Create a socket.
+        //  Create and destroy a socket.
         class socket_base_t *create_socket (int type_);
-
-        //  Destroy a socket.
-        void destroy_socket ();
-
-        //  Called by app_thread_t when it has no more sockets. The function
-        //  should disassociate the object from the current OS thread.
-        void no_sockets (class app_thread_t *thread_);
+        void destroy_socket (class socket_base_t *socket_);
 
         //  Send command to the destination thread.
-        void send_command (uint32_t destination_, const command_t &command_);
-
-        //  Receive command from another thread.
-        bool recv_command (uint32_t thread_slot_, command_t *command_,
-            bool block_);
+        void send_command (uint32_t tid_, const command_t &command_);
 
         //  Returns the I/O thread that is the least busy at the moment.
-        //  Taskset specifies which I/O threads are eligible (0 = all).
-        class io_thread_t *choose_io_thread (uint64_t taskset_);
+        //  Affinity specifies which I/O threads are eligible (0 = all).
+        //  Returns NULL is no I/O thread is available.
+        class io_thread_t *choose_io_thread (uint64_t affinity_);
 
-        //  All pipes are registered with the context so that even the
-        //  orphaned pipes can be deallocated on the terminal shutdown.
-        void register_pipe (class pipe_t *pipe_);
-        void unregister_pipe (class pipe_t *pipe_);
+        //  Returns reaper thread object.
+        class object_t *get_reaper ();
 
         //  Management of inproc endpoints.
-        int register_endpoint (const char *addr_, class socket_base_t *socket_);
+        int register_endpoint (const char *addr_, endpoint_t &endpoint_);
         void unregister_endpoints (class socket_base_t *socket_);
-        class socket_base_t *find_endpoint (const char *addr_);
+        endpoint_t find_endpoint (const char *addr_);
+
+        //  Logging.
+        void log (const char *format_, va_list args_);
+
+        enum {
+            term_tid = 0,
+            reaper_tid = 1
+        };
 
     private:
 
         ~ctx_t ();
 
-        struct app_thread_info_t
-        {
-            //  If false, 0MQ application thread is free, there's no associated
-            //  OS thread.
-            bool associated;
+        //  Sockets belonging to this context. We need the list so that
+        //  we can notify the sockets when zmq_term() is called. The sockets
+        //  will return ETERM then.
+        typedef array_t <socket_base_t> sockets_t;
+        sockets_t sockets;
 
-            //  ID of the associated OS thread. If 'associated' is false,
-            //  this field contains bogus data.
-            thread_t::id_t tid;
+        //  List of unused thread slots.
+        typedef std::vector <uint32_t> emtpy_slots_t;
+        emtpy_slots_t empty_slots;
 
-            //  Pointer to the 0MQ application thread object.
-            class app_thread_t *app_thread;
-        };
+        //  If true, zmq_term was already called.
+        bool terminating;
 
-        //  Application threads.
-        typedef std::vector <app_thread_info_t> app_threads_t;
-        app_threads_t app_threads;
+        //  Synchronisation of accesses to global slot-related data:
+        //  sockets, empty_slots, terminating. It also synchronises
+        //  access to zombie sockets as such (as oposed to slots) and provides
+        //  a memory barrier to ensure that all CPU cores see the same data.
+        mutex_t slot_sync;
 
-        //  Synchronisation of accesses to shared application thread data.
-        mutex_t app_threads_sync;
+        //  The reaper thread.
+        class reaper_t *reaper;
 
         //  I/O threads.
         typedef std::vector <class io_thread_t*> io_threads_t;
         io_threads_t io_threads;
 
-        //  Array of pointers to signalers for both application and I/O threads.
-        int signalers_count;
-        signaler_t **signalers;
+        //  Array of pointers to mailboxes for both application and I/O threads.
+        uint32_t slot_count;
+        mailbox_t **slots;
 
-        //  As pipes may reside in orphaned state in particular moments
-        //  of the pipe shutdown process, i.e. neither pipe reader nor
-        //  pipe writer hold reference to the pipe, we have to hold references
-        //  to all pipes in context so that we can deallocate them
-        //  during terminal shutdown even though it conincides with the
-        //  pipe being in the orphaned state.
-        typedef std::set <class pipe_t*> pipes_t;
-        pipes_t pipes;
-
-        //  Synchronisation of access to the pipes repository.
-        mutex_t pipes_sync;
-
-        //  Number of sockets alive.
-        int sockets;
-
-        //  If true, zmq_term was already called. When last socket is closed
-        //  the whole 0MQ infrastructure should be deallocated.
-        bool terminated;
-
-        //  Synchronisation of access to the termination data (socket count
-        //  and 'terminated' flag).
-        mutex_t term_sync;
+        //  Mailbox for zmq_term thread.
+        mailbox_t term_mailbox;
 
         //  List of inproc endpoints within this context.
-        typedef std::map <std::string, class socket_base_t*> endpoints_t;
+        typedef std::map <std::string, endpoint_t> endpoints_t;
         endpoints_t endpoints;
 
         //  Synchronisation of access to the list of inproc endpoints.
         mutex_t endpoints_sync;
 
+        //  PUB socket for logging. The socket is shared among all the threads,
+        //  thus it is synchronised by a mutex.
+        class socket_base_t *log_socket;
+        mutex_t log_sync;
+
         ctx_t (const ctx_t&);
-        void operator = (const ctx_t&);
+        const ctx_t &operator = (const ctx_t&);
     };
     
 }

File include/decoder.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 
 #include "err.hpp"
 
+#include "../include/zmq.h"
+
 namespace zmq
 {
 
     //  Helper base class for decoders that know the amount of data to read
     //  in advance at any moment. Knowing the amount in advance is a property
-    //  of the protocol used. Both AMQP and backend protocol are based on
-    //  size-prefixed paradigm, therefore they are using decoder_t to parse
-    //  the messages. On the other hand, XML-based transports (like XMPP or
-    //  SOAP) don't allow for knowing the size of data to read in advance and
-    //  should use different decoding algorithms.
+    //  of the protocol used. 0MQ framing protocol is based size-prefixed
+    //  paradigm, whixh qualifies it to be parsed by this class.
+    //  On the other hand, XML-based transports (like XMPP or SOAP) don't allow
+    //  for knowing the size of data to read in advance and should use different
+    //  decoding algorithms.
     //
-    //  Decoder implements the state machine that parses the incoming buffer.
+    //  This class implements the state machine that parses the incoming buffer.
     //  Derived class should implement individual state machine actions.
 
-    template <typename T> class decoder_t
+    template <typename T> class decoder_base_t
     {
     public:
 
-        inline decoder_t (size_t bufsize_) :
+        inline decoder_base_t (size_t bufsize_) :
             read_pos (NULL),
             to_read (0),
             next (NULL),
             bufsize (bufsize_)
         {
             buf = (unsigned char*) malloc (bufsize_);
-            zmq_assert (buf);
+            alloc_assert (buf);
         }
 
         //  The destructor doesn't have to be virtual. It is mad virtual
         //  just to keep ICC and code checking tools from complaining.
-        inline virtual ~decoder_t ()
+        inline virtual ~decoder_base_t ()
         {
             free (buf);
         }
         //  bytes actually processed.
         inline size_t process_buffer (unsigned char *data_, size_t size_)
         {
+            //  Check if we had an error in previous attempt.
+            if (unlikely (!(static_cast <T*> (this)->next)))
+                return (size_t) -1;
+
             //  In case of zero-copy simply adjust the pointers, no copying
             //  is required. Also, run the state machine in case all the data
             //  were processed.
                 read_pos += size_;
                 to_read -= size_;
 
-                while (!to_read)
-                    if (!(static_cast <T*> (this)->*next) ())
+                while (!to_read) {
+                    if (!(static_cast <T*> (this)->*next) ()) {
+                        if (unlikely (!(static_cast <T*> (this)->next)))
+                            return (size_t) -1;
                         return size_;
+                    }
+                }
                 return size_;
             }
 
 
                 //  Try to get more space in the message to fill in.
                 //  If none is available, return.
-                while (!to_read)
-                    if (!(static_cast <T*> (this)->*next) ())
+                while (!to_read) {
+                    if (!(static_cast <T*> (this)->*next) ()) {
+                        if (unlikely (!(static_cast <T*> (this)->next)))
+                            return (size_t) -1;
                         return pos;
+                    }
+                }
 
                 //  If there are no more data in the buffer, return.
                 if (pos == size_)
             next = next_;
         }
 
+        //  This function should be called from the derived class to
+        //  abort decoder state machine.
+        inline void decoding_error ()
+        {
+            next = NULL;
+        }
+
     private:
 
         unsigned char *read_pos;
         size_t bufsize;
         unsigned char *buf;
 
+        decoder_base_t (const decoder_base_t&);
+        const decoder_base_t &operator = (const decoder_base_t&);
+    };
+
+    //  Decoder for 0MQ framing protocol. Converts data batches into messages.
+
+    class decoder_t : public decoder_base_t <decoder_t>
+    {
+    public:
+
+        decoder_t (size_t bufsize_);
+        ~decoder_t ();
+
+        void set_inout (struct i_inout *destination_);
+
+    private:
+
+        bool one_byte_size_ready ();
+        bool eight_byte_size_ready ();
+        bool flags_ready ();
+        bool message_ready ();
+
+        struct i_inout *destination;
+        unsigned char tmpbuf [8];
+        ::zmq_msg_t in_progress;
+
         decoder_t (const decoder_t&);
         void operator = (const decoder_t&);
     };
 }
 
 #endif
+

File include/device.hpp

+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
+#define __ZMQ_DEVICE_HPP_INCLUDED__
+
+namespace zmq
+{
+
+    int device (class socket_base_t *insocket_,
+        class socket_base_t *outsocket_);
+
+}
+
+#endif

File include/devpoll.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 
 #include "platform.hpp"
 
-#if defined ZMQ_HAVE_SOLARIS || ZMQ_HAVE_HPUX
+#if defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_HPUX
 
 #include <vector>
 
 #include "fd.hpp"
 #include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
 
 namespace zmq
 {
 
-    //  Implements socket polling mechanism using the Solaris-specific
-    //  "/dev/poll" interface.
+    //  Implements socket polling mechanism using the "/dev/poll" interface.
 
-    class devpoll_t
+    class devpoll_t : public poller_base_t
     {
     public:
 
         void reset_pollin (handle_t handle_);
         void set_pollout (handle_t handle_);
         void reset_pollout (handle_t handle_);
-        void add_timer (struct i_poll_events *events_);
-        void cancel_timer (struct i_poll_events *events_);
-        int get_load ();
         void start ();
         void stop ();
 
             bool accepted;
         };
 
-        std::vector <fd_entry_t> fd_table;
+        typedef std::vector <fd_entry_t> fd_table_t;
+        fd_table_t fd_table;
 
         typedef std::vector <fd_t> pending_list_t;
         pending_list_t pending_list;
         //  Pollset manipulation function.
         void devpoll_ctl (fd_t fd_, short events_);
 
-        //  List of all the engines waiting for the timer event.
-        typedef std::vector <struct i_poll_events*> timers_t;
-        timers_t timers;
-
         //  If true, thread is in the process of shutting down.
         bool stopping;
 
         //  Handle of the physical thread doing the I/O work.
         thread_t worker;
 
-        //  Load of the poller. Currently number of file descriptors
-        //  registered with the poller.
-        atomic_counter_t load;
-
         devpoll_t (const devpoll_t&);
-        void operator = (const devpoll_t&);
+        const devpoll_t &operator = (const devpoll_t&);
     };
 
 }

File include/dist.hpp

+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DIST_HPP_INCLUDED__
+#define __ZMQ_DIST_HPP_INCLUDED__
+
+#include <vector>
+
+#include "array.hpp"
+#include "pipe.hpp"
+
+namespace zmq
+{
+
+    //  Class manages a set of outbound pipes. It sends each messages to
+    //  each of them.
+    class dist_t : public i_writer_events
+    {
+    public:
+
+        dist_t (class own_t *sink_);
+        ~dist_t ();
+
+        void attach (writer_t *pipe_);
+        void terminate ();
+        int send (zmq_msg_t *msg_, int flags_);
+        bool has_out ();
+
+        //  i_writer_events interface implementation.
+        void activated (writer_t *pipe_);
+        void terminated (writer_t *pipe_);
+
+    private:
+
+        //  Write the message to the pipe. Make the pipe inactive if writing
+        //  fails. In such a case false is returned.
+        bool write (class writer_t *pipe_, zmq_msg_t *msg_);
+
+        //  Put the message to all active pipes.
+        void distribute (zmq_msg_t *msg_, int flags_);
+
+        //  Plug in all the delayed pipes.
+        void clear_new_pipes ();
+
+        //  List of outbound pipes.
+        typedef array_t <class writer_t> pipes_t;
+        pipes_t pipes;
+
+        //  List of new pipes that were not yet inserted into 'pipes' list.
+        //  These pipes are moves to 'pipes' list once the current multipart
+        //  message is fully sent. This way we avoid sending incomplete messages
+        //  to peers.
+        typedef std::vector <class writer_t*> new_pipes_t;
+        new_pipes_t new_pipes;
+
+        //  Number of active pipes. All the active pipes are located at the
+        //  beginning of the pipes array.
+        pipes_t::size_type active;
+
+        //  True if last we are in the middle of a multipart message.
+        bool more;
+
+        //  Object to send events to.
+        class own_t *sink;
+
+        //  If true, termination process is already underway.
+        bool terminating;
+
+        dist_t (const dist_t&);
+        const dist_t &operator = (const dist_t&);
+    };
+
+}
+
+#endif

File include/encoder.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #ifndef __ZMQ_ENCODER_HPP_INCLUDED__
 #define __ZMQ_ENCODER_HPP_INCLUDED__
 
-#include "platform.hpp"
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#endif
-
 #include <stddef.h>
 #include <string.h>
 #include <stdlib.h>
 
 #include "err.hpp"
 
+#include "../include/zmq.h"
+
 namespace zmq
 {
 
     //  fills the outgoing buffer. Derived classes should implement individual
     //  state machine actions.
 
-    template <typename T> class encoder_t
+    template <typename T> class encoder_base_t
     {
     public:
 
-        inline encoder_t (size_t bufsize_) :
+        inline encoder_base_t (size_t bufsize_) :
             bufsize (bufsize_)
         {
             buf = (unsigned char*) malloc (bufsize_);
-            zmq_assert (buf);
+            alloc_assert (buf);
         }
 
-        //  The destructor doesn't have to be virtual. It is mad virtual
+        //  The destructor doesn't have to be virtual. It is made virtual
         //  just to keep ICC and code checking tools from complaining.
-        inline virtual ~encoder_t ()
+        inline virtual ~encoder_base_t ()
         {
             free (buf);
         }
         size_t bufsize;
         unsigned char *buf;
 
-        encoder_t (const encoder_t&);
-        void operator = (const encoder_t&);
+        encoder_base_t (const encoder_base_t&);
+        void operator = (const encoder_base_t&);
     };
 
+    //  Encoder for 0MQ framing protocol. Converts messages into data batches.
+
+    class encoder_t : public encoder_base_t <encoder_t>
+    {
+    public:
+
+        encoder_t (size_t bufsize_);
+        ~encoder_t ();
+
+        void set_inout (struct i_inout *source_);
+
+    private:
+
+        bool size_ready ();
+        bool message_ready ();
+
+        struct i_inout *source;
+        ::zmq_msg_t in_progress;
+        unsigned char tmpbuf [10];
+
+        encoder_t (const encoder_t&);
+        const encoder_t &operator = (const encoder_t&);
+    };
 }
 
 #endif
+

File include/epoll.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 
 #include "fd.hpp"
 #include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
 
 namespace zmq
 {
     //  This class implements socket polling mechanism using the Linux-specific
     //  epoll mechanism.
 
-    class epoll_t
+    class epoll_t : public poller_base_t
     {
     public:
 
         void reset_pollin (handle_t handle_);
         void set_pollout (handle_t handle_);
         void reset_pollout (handle_t handle_);
-        void add_timer (struct i_poll_events *events_);
-        void cancel_timer (struct i_poll_events *events_);
-        int get_load ();
         void start ();
         void stop ();
 
         typedef std::vector <poll_entry_t*> retired_t;
         retired_t retired;
 
-        //  List of all the engines waiting for the timer event.
-        typedef std::vector <struct i_poll_events*> timers_t;
-        timers_t timers;
-
         //  If true, thread is in the process of shutting down.
         bool stopping;
 
         //  Handle of the physical thread doing the I/O work.
         thread_t worker;
 
-        //  Load of the poller. Currently number of file descriptors
-        //  registered with the poller.
-        atomic_counter_t load;
-
         epoll_t (const epoll_t&);
-        void operator = (const epoll_t&);
+        const epoll_t &operator = (const epoll_t&);
     };
 
 }

File include/err.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include <netdb.h>
 #endif
 
+namespace zmq
+{
+    const char *errno_to_string (int errno_);
+}
+
 #ifdef ZMQ_HAVE_WINDOWS
 
 namespace zmq
 {
-
     const char *wsa_error ();
     void win_error (char *buffer_, size_t buffer_size_);
-    void wsa_error_to_errno ();
-  
+    void wsa_error_to_errno (); 
 }
 
 //  Provides convenient way to check WSA-style errors on Windows.
         }\
     } while (false)
 
-// Provides convenient way to check for POSIX errors.
+//  Provides convenient way to check for POSIX errors.
 #define posix_assert(x) \
     do {\
         if (unlikely (x)) {\
         }\
     } while (false)
 
-// Provides convenient way to check for errors from getaddrinfo.
+//  Provides convenient way to check for errors from getaddrinfo.
 #define gai_assert(x) \
     do {\
         if (unlikely (x)) {\
         }\
     } while (false)
 
+//  Provides convenient way to check whether memory allocation have succeeded.
+#define alloc_assert(x) \
+    do {\
+        if (unlikely (!x)) {\
+            fprintf (stderr, "FATAL ERROR: OUT OF MEMORY (%s:%d)\n",\
+                __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false)
+
 #endif
 
-#define zmq_not_implemented() \
-    do {\
-        fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\
-        abort ();\
-    } while (false)
+

File include/fd.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 

File include/forwarder.hpp

-/*
-    Copyright (c) 2007-2010 iMatix Corporation
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__
-#define __ZMQ_FORWARDER_HPP_INCLUDED__
-
-namespace zmq
-{
-
-    int forwarder (class socket_base_t *insocket_,
-        class socket_base_t *outsocket_);
-
-}
-
-#endif

File include/fq.hpp

 /*
-    Copyright (c) 2007-2010 iMatix Corporation
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
     This file is part of 0MQ.
 
     0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
+    the terms of the GNU Lesser General Public License as published by
     the Free Software Foundation; either version 3 of the License, or
     (at your option) any later version.
 
     0MQ is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
+    GNU Lesser General Public License for more details.
 
-    You should have received a copy of the Lesser GNU General Public License
+    You should have received a copy of the GNU Lesser General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #ifndef __ZMQ_FQ_HPP_INCLUDED__
 #define __ZMQ_FQ_HPP_INCLUDED__
 
-#include "yarray.hpp"
+#include "array.hpp"
+#include "pipe.hpp"
 
 namespace zmq
 {
     //  Class manages a set of inbound pipes. On receive it performs fair
     //  queueing (RFC970) so that senders gone berserk won't cause denial of
     //  service for decent senders.
-    class fq_t
+    class fq_t : public i_reader_events
     {
     public:
 
-        fq_t ();
+        fq_t (class own_t *sink_);
         ~fq_t ();
 
-        void attach (class reader_t *pipe_);
-        void detach (class reader_t *pipe_);
-        void kill (class reader_t *pipe_);
-        void revive (class reader_t *pipe_);
+        void attach (reader_t *pipe_);
+        void terminate ();
+
         int recv (zmq_msg_t *msg_, int flags_);
         bool has_in ();
 
+        //  i_reader_events implementation.
+        void activated (reader_t *pipe_);
+        void terminated (reader_t *pipe_);
+        void delimited (reader_t *pipe_);
+
     private:
 
         //  Inbound pipes.
-        typedef yarray_t <class reader_t> pipes_t;
+        typedef array_t <reader_t> pipes_t;
         pipes_t pipes;
 
         //  Number of active pipes. All the active pipes are located at the
         //  there are following parts still waiting in the current pipe.
         bool more;
 
+        //  Object to send events to.
+        class own_t *sink;
+
+        //  If true, termination process is already underway.
+        bool terminating;
+
         fq_t (const fq_t&);
-        void operator = (const fq_t&);
+        const fq_t &operator = (const fq_t&);
     };
 
 }

File include/i_endpoint.hpp

-/*
-    Copyright (c) 2007-2010 iMatix Corporation
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
-#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
-
-#include "blob.hpp"
-
-namespace zmq
-{
-
-    struct i_endpoint
-    {
-        virtual ~i_endpoint () {}
-
-        virtual void attach_pipes (class reader_t *inpipe_,
-            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
-        virtual void detach_inpipe (class reader_t *pipe_) = 0;
-        virtual void detach_outpipe (class writer_t *pipe_) = 0;
-        virtual void kill (class reader_t *pipe_) = 0;
-        virtual void revive (class reader_t *pipe_) = 0;
-        virtual void revive (class writer_t *pipe_) = 0;
-    };
-
-}
-
-#endif