Brandon Rhodes avatar Brandon Rhodes committed fabb14c

Improved the "get.sh" script so it puts everything in a single "include"
and a single "src" directory. Well, except what Windows needs, which I
will tackle next.

Comments (0)

Files changed (146)

 tar xvfz tmp/zeromq-2.0.9.tar.gz
 tar xvfz tmp/pyzmq-2.0.7.tar.gz
 
-# Segregate out the UUID sources by operating system.
-rm -rf uuid
-mkdir uuid
-cp util-linux-ng-2.18/shlibs/uuid/src/*.c uuid
+# Generate platform.hpp from platform.hpp.in
+(cd zeromq-2.0.9; ./configure)
 
-mkdir uuid-nt
-cp util-linux-ng-2.18/shlibs/uuid/src/uuid.sym uuid-nt
-mv uuid/gen_uuid_nt.c uuid-nt
+# Copy the files we need into our version-controlled directories.
+rm -rf include src src_nt
+mkdir include src src_nt
+
+cp util-linux-ng-2.18/shlibs/uuid/src/*.c \
+   zeromq-2.0.9/src/*.cpp \
+   pyzmq-2.0.7/zmq/_zmq.c \
+   src
+mv src/gen_uuid_nt.c src_nt
+cp util-linux-ng-2.18/shlibs/uuid/src/uuid.sym src_nt
+
+cp util-linux-ng-2.18/shlibs/uuid/src/*.h \
+   zeromq-2.0.9/include/*.h* \
+   zeromq-2.0.9/src/*.h* \
+   pyzmq-2.0.7/zmq/*.h \
+   include

include/app_thread.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
+#define __ZMQ_APP_THREAD_HPP_INCLUDED__
+
+#include <vector>
+
+#include "stdint.hpp"
+#include "object.hpp"
+#include "yarray.hpp"
+#include "signaler.hpp"
+
+namespace zmq
+{
+
+    class app_thread_t : public object_t
+    {
+    public:
+
+        app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
+
+        ~app_thread_t ();
+
+        //  Interrupt blocking call if the app thread is stuck in one.
+        //  This function is is called from a different thread!
+        void stop ();
+
+        //  Returns signaler associated with this application thread.
+        signaler_t *get_signaler ();
+
+        //  Processes commands sent to this thread (if any). If 'block' is
+        //  set to true, returns only after at least one command was processed.
+        //  If throttle argument is true, commands are processed at most once
+        //  in a predefined time period. The function returns false is the
+        //  associated context was terminated, true otherwise.
+        bool process_commands (bool block_, bool throttle_);
+
+        //  Create a socket of a specified type.
+        class socket_base_t *create_socket (int type_);
+
+        //  Unregister the socket from the app_thread (called by socket itself).
+        void remove_socket (class socket_base_t *socket_);
+
+        //  Returns true is the associated context was already terminated.
+        bool is_terminated ();
+
+    private:
+
+        //  Command handlers.
+        void process_stop ();
+
+        //  All the sockets created from this application thread.
+        typedef yarray_t <socket_base_t> sockets_t;
+        sockets_t sockets;
+
+        //  App thread's signaler object.
+        signaler_t signaler;
+
+        //  Timestamp of when commands were processed the last time.
+        uint64_t last_processing_time;
+
+        //  If true, 'stop' command was already received.
+        bool terminated;
+
+        app_thread_t (const app_thread_t&);
+        void operator = (const app_thread_t&);
+    };
+
+}
+
+#endif

include/atomic_counter.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__
+#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__
+
+#include "stdint.hpp"
+#include "platform.hpp"
+
+#if defined ZMQ_FORCE_MUTEXES
+#define ZMQ_ATOMIC_COUNTER_MUTEX
+#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
+#define ZMQ_ATOMIC_COUNTER_X86
+#elif defined ZMQ_HAVE_WINDOWS
+#define ZMQ_ATOMIC_COUNTER_WINDOWS
+#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_NETBSD)
+#define ZMQ_ATOMIC_COUNTER_ATOMIC_H
+#else
+#define ZMQ_ATOMIC_COUNTER_MUTEX
+#endif
+
+#if defined ZMQ_ATOMIC_COUNTER_MUTEX
+#include "mutex.hpp"
+#elif defined ZMQ_ATOMIC_COUNTER_WINDOWS
+#include "windows.hpp"
+#elif defined ZMQ_ATOMIC_COUNTER_ATOMIC_H
+#include <atomic.h>
+#endif
+
+namespace zmq
+{
+
+    //  This class represents an integer that can be incremented/decremented
+    //  in atomic fashion.
+
+    class atomic_counter_t
+    {
+    public:
+
+        typedef uint32_t integer_t;
+
+        inline atomic_counter_t (integer_t value_ = 0) :
+            value (value_)
+        {
+        }
+
+        inline ~atomic_counter_t ()
+        {
+        }
+
+        //  Set counter value (not thread-safe).
+        inline void set (integer_t value_)
+        {
+            value = value_;
+        }
+
+        //  Atomic addition. Returns the old value.
+        inline integer_t add (integer_t increment_)
+        {
+            integer_t old_value;
+
+#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
+            old_value = InterlockedExchangeAdd ((LONG*) &value, increment_);
+#elif defined ZMQ_ATOMIC_COUNTER_ATOMIC_H
+            integer_t new_value = atomic_add_32_nv (&value, increment_);
+            old_value = new_value - increment_;
+#elif defined ZMQ_ATOMIC_COUNTER_X86
+            __asm__ volatile (
+                "lock; xadd %0, %1 \n\t"
+                : "=r" (old_value), "=m" (value)
+                : "0" (increment_), "m" (value)
+                : "cc", "memory");
+#elif defined ZMQ_ATOMIC_COUNTER_MUTEX
+            sync.lock ();
+            old_value = value;
+            value += increment_;
+            sync.unlock ();
+#else
+#error atomic_counter is not implemented for this platform
+#endif
+            return old_value;
+        }
+
+        //  Atomic subtraction. Returns false if the counter drops to zero.
+        inline bool sub (integer_t decrement)
+        {
+#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
+            LONG delta = - ((LONG) decrement);
+            integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta);
+            return old - decrement != 0;
+#elif defined ZMQ_ATOMIC_COUNTER_ATOMIC_H
+            int32_t delta = - ((int32_t) decrement);
+            integer_t nv = atomic_add_32_nv (&value, delta);
+            return nv != 0;
+#elif defined ZMQ_ATOMIC_COUNTER_X86
+            integer_t oldval = -decrement;
+            volatile integer_t *val = &value;
+            __asm__ volatile ("lock; xaddl %0,%1"
+                : "=r" (oldval), "=m" (*val)
+                : "0" (oldval), "m" (*val)
+                : "cc", "memory");
+            return oldval != decrement;
+#elif defined ZMQ_ATOMIC_COUNTER_MUTEX
+            sync.lock ();
+            value -= decrement;
+            bool result = value ? true : false;
+            sync.unlock ();
+            return result;
+#else
+#error atomic_counter is not implemented for this platform
+#endif
+        }
+
+        inline integer_t get ()
+        {
+            return value;
+        }
+
+    private:
+
+        volatile integer_t value;
+#if defined ZMQ_ATOMIC_COUNTER_MUTEX
+        mutex_t sync;
+#endif
+
+        atomic_counter_t (const atomic_counter_t&);
+        void operator = (const atomic_counter_t&);
+    };
+
+}
+
+//  Remove macros local to this file.
+#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
+#undef ZMQ_ATOMIC_COUNTER_WINDOWS
+#endif
+#if defined ZMQ_ATOMIC_COUNTER_ATOMIC_H
+#undef ZMQ_ATOMIC_COUNTER_ATOMIC_H
+#endif
+#if defined ZMQ_ATOMIC_COUNTER_X86
+#undef ZMQ_ATOMIC_COUNTER_X86
+#endif
+#if defined ZMQ_ATOMIC_COUNTER_MUTEX
+#undef ZMQ_ATOMIC_COUNTER_MUTEX
+#endif
+
+#endif
+

include/atomic_ptr.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
+#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#if defined ZMQ_FORCE_MUTEXES
+#define ZMQ_ATOMIC_PTR_MUTEX
+#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
+#define ZMQ_ATOMIC_PTR_X86
+#elif defined ZMQ_HAVE_WINDOWS
+#define ZMQ_ATOMIC_PTR_WINDOWS
+#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_NETBSD)
+#define ZMQ_ATOMIC_PTR_ATOMIC_H
+#else
+#define ZMQ_ATOMIC_PTR_MUTEX
+#endif
+
+#if defined ZMQ_ATOMIC_PTR_MUTEX
+#include "mutex.hpp"
+#elif defined ZMQ_ATOMIC_PTR_WINDOWS
+#include "windows.hpp"
+#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H
+#include <atomic.h>
+#endif
+
+namespace zmq
+{
+
+    //  This class encapsulates several atomic operations on pointers.
+
+    template <typename T> class atomic_ptr_t
+    {
+    public:
+
+        //  Initialise atomic pointer
+        inline atomic_ptr_t ()
+        {
+            ptr = NULL;
+        }
+
+        //  Destroy atomic pointer
+        inline ~atomic_ptr_t ()
+        {
+        }
+
+        //  Set value of atomic pointer in a non-threadsafe way
+        //  Use this function only when you are sure that at most one
+        //  thread is accessing the pointer at the moment.
+        inline void set (T *ptr_)
+        {
+            this->ptr = ptr_;
+        }
+
+        //  Perform atomic 'exchange pointers' operation. Pointer is set
+        //  to the 'val' value. Old value is returned.
+        inline T *xchg (T *val_)
+        {
+#if defined ZMQ_ATOMIC_PTR_WINDOWS
+            return (T*) InterlockedExchangePointer ((PVOID*) &ptr, val_);
+#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H
+            return (T*) atomic_swap_ptr (&ptr, val_);
+#elif defined ZMQ_ATOMIC_PTR_X86
+            T *old;
+            __asm__ volatile (
+                "lock; xchg %0, %2"
+                : "=r" (old), "=m" (ptr)
+                : "m" (ptr), "0" (val_));
+            return old;
+#elif defined ZMQ_ATOMIC_PTR_MUTEX
+            sync.lock ();
+            T *old = (T*) ptr;
+            ptr = val_;
+            sync.unlock ();
+            return old;
+#else
+#error atomic_ptr is not implemented for this platform
+#endif
+        }
+
+        //  Perform atomic 'compare and swap' operation on the pointer.
+        //  The pointer is compared to 'cmp' argument and if they are
+        //  equal, its value is set to 'val'. Old value of the pointer
+        //  is returned.
+        inline T *cas (T *cmp_, T *val_)
+        {
+#if defined ZMQ_ATOMIC_PTR_WINDOWS
+            return (T*) InterlockedCompareExchangePointer (
+                (volatile PVOID*) &ptr, val_, cmp_);
+#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H
+            return (T*) atomic_cas_ptr (&ptr, cmp_, val_);
+#elif defined ZMQ_ATOMIC_PTR_X86
+            T *old;
+            __asm__ volatile (
+                "lock; cmpxchg %2, %3"
+                : "=a" (old), "=m" (ptr)
+                : "r" (val_), "m" (ptr), "0" (cmp_)
+                : "cc");
+            return old;
+#elif defined ZMQ_ATOMIC_PTR_MUTEX
+            sync.lock ();
+            T *old = (T*) ptr;
+            if (ptr == cmp_)
+                ptr = val_;
+            sync.unlock ();
+            return old;
+#else
+#error atomic_ptr is not implemented for this platform
+#endif
+        }
+
+    private:
+
+        volatile T *ptr;
+#if defined ZMQ_ATOMIC_PTR_MUTEX
+        mutex_t sync;
+#endif
+
+        atomic_ptr_t (const atomic_ptr_t&);
+        void operator = (const atomic_ptr_t&);
+    };
+
+}
+
+//  Remove macros local to this file.
+#if defined ZMQ_ATOMIC_PTR_WINDOWS
+#undef ZMQ_ATOMIC_PTR_WINDOWS
+#endif
+#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
+#undef ZMQ_ATOMIC_PTR_ATOMIC_H
+#endif
+#if defined ZMQ_ATOMIC_PTR_X86
+#undef ZMQ_ATOMIC_PTR_X86
+#endif
+#if defined ZMQ_ATOMIC_PTR_MUTEX
+#undef ZMQ_ATOMIC_PTR_MUTEX
+#endif
+
+#endif
+
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+    //  Object to hold dynamically allocated opaque binary data.
+    typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif

include/command.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_COMMAND_HPP_INCLUDED__
+#define __ZMQ_COMMAND_HPP_INCLUDED__
+
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+    //  This structure defines the commands that can be sent between threads.
+
+    struct command_t
+    {
+        //  Object to process the command.
+        class object_t *destination;
+
+        enum type_t
+        {
+            stop,
+            plug,
+            own,
+            attach,
+            bind,
+            revive,
+            reader_info,
+            pipe_term,
+            pipe_term_ack,
+            term_req,
+            term,
+            term_ack
+        } type;
+
+        union {
+
+            //  Sent to I/O thread to let it know that it should
+            //  terminate itself.
+            struct {
+            } stop;
+
+            //  Sent to I/O object to make it register with its I/O thread.
+            struct {
+            } plug;
+
+            //  Sent to socket to let it know about the newly created object.
+            struct {
+                class owned_t *object;
+            } own;
+
+            //  Attach the engine to the session.
+            struct {
+                struct i_engine *engine;
+                unsigned char peer_identity_size;
+                unsigned char *peer_identity;
+            } attach;
+
+            //  Sent from session to socket to establish pipe(s) between them.
+            //  Caller have used inc_seqnum beforehand sending the command.
+            struct {
+                class reader_t *in_pipe;
+                class writer_t *out_pipe;
+                unsigned char peer_identity_size;
+                unsigned char *peer_identity;
+            } bind;
+
+            //  Sent by pipe writer to inform dormant pipe reader that there
+            //  are messages in the pipe.
+            struct {
+            } revive;
+
+            //  Sent by pipe reader to inform pipe writer
+            //  about how many messages it has read so far.
+            //  Used to implement the flow control.
+            struct {
+                uint64_t msgs_read;
+            } reader_info;
+
+            //  Sent by pipe reader to pipe writer to ask it to terminate
+            //  its end of the pipe.
+            struct {
+            } pipe_term;
+
+            //  Pipe writer acknowledges pipe_term command.
+            struct {
+            } pipe_term_ack;
+
+            //  Sent by I/O object ot the socket to request the shutdown of
+            //  the I/O object.
+            struct {
+                class owned_t *object;
+            } term_req;
+
+            //  Sent by socket to I/O object to start its shutdown.
+            struct {
+            } term;
+
+            //  Sent by I/O object to the socket to acknowledge it has
+            //  shut down.
+            struct {
+            } term_ack;
+
+        } args;
+    };
+
+    //  Function to deallocate dynamically allocated components of the command.
+    void deallocate_command (command_t *cmd_);
+
+}    
+
+#endif

include/config.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_CONFIG_HPP_INCLUDED__
+#define __ZMQ_CONFIG_HPP_INCLUDED__
+
+namespace zmq
+{
+
+    //  Compile-time settings.
+
+    enum 
+    {
+        //  Maximal number of OS threads that can own 0MQ sockets
+        //  at the same time.
+        max_app_threads = 512,
+
+        //  Number of new messages in message pipe needed to trigger new memory
+        //  allocation. Setting this parameter to 256 decreases the impact of
+        //  memory allocation by approximately 99.6%
+        message_pipe_granularity = 256,
+
+        //  Number of signals that can be read by the signaler
+        //  using a single system call.
+        signal_buffer_size = 8,
+
+        //  Determines how often does socket poll for new commands when it
+        //  still has unprocessed messages to handle. Thus, if it is set to 100,
+        //  socket will process 100 inbound messages before doing the poll.
+        //  If there are no unprocessed messages available, poll is done
+        //  immediately. Decreasing the value trades overall latency for more
+        //  real-time behaviour (less latency peaks).
+        inbound_poll_rate = 100,
+
+        //  Maximal batching size for engines with receiving functionality.
+        //  So, if there are 10 messages that fit into the batch size, all of
+        //  them may be read by a single 'recv' system call, thus avoiding
+        //  unnecessary network stack traversals.
+        in_batch_size = 8192,
+
+        //  Maximal batching size for engines with sending functionality.
+        //  So, if there are 10 messages that fit into the batch size, all of
+        //  them may be written by a single 'send' system call, thus avoiding
+        //  unnecessary network stack traversals.
+        out_batch_size = 8192,
+
+        //  Maximal delta between high and low watermark.
+        max_wm_delta = 1024,
+
+        //  Maximum number of events the I/O thread can process in one go.
+        max_io_events = 256,
+
+        //  Maximal wait time for a timer (milliseconds).
+        max_timer_period = 100,
+
+        //  Maximal delay to process command in API thread (in CPU ticks).
+        //  3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
+        max_command_delay = 3000000,
+
+        //  Maximal number of non-accepted connections that can be held by
+        //  TCP listener object.
+        tcp_connection_backlog = 10,
+
+        //  Maximum transport data unit size for PGM (TPDU).
+        pgm_max_tpdu = 1500
+    };
+
+}
+
+#endif
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_CTX_HPP_INCLUDED__
+#define __ZMQ_CTX_HPP_INCLUDED__
+
+#include <vector>
+#include <set>
+#include <map>
+#include <string>
+
+#include "signaler.hpp"
+#include "ypipe.hpp"
+#include "config.hpp"
+#include "mutex.hpp"
+#include "stdint.hpp"
+#include "thread.hpp"
+
+namespace zmq
+{
+
+    //  Context object encapsulates all the global state associated with
+    //  the library.
+    
+    class ctx_t
+    {
+    public:
+
+        //  Create the context object. The argument specifies the size
+        //  of I/O thread pool to create.
+        ctx_t (uint32_t io_threads_);
+
+        //  This function is called when user invokes zmq_term. If there are
+        //  no more sockets open it'll cause all the infrastructure to be shut
+        //  down. If there are open sockets still, the deallocation happens
+        //  after the last one is closed.
+        int term ();
+
+        //  Create a socket.
+        class socket_base_t *create_socket (int type_);
+
+        //  Destroy a socket.
+        void destroy_socket ();
+
+        //  Called by app_thread_t when it has no more sockets. The function
+        //  should disassociate the object from the current OS thread.
+        void no_sockets (class app_thread_t *thread_);
+
+        //  Send command to the destination thread.
+        void send_command (uint32_t destination_, const command_t &command_);
+
+        //  Receive command from another thread.
+        bool recv_command (uint32_t thread_slot_, command_t *command_,
+            bool block_);
+
+        //  Returns the I/O thread that is the least busy at the moment.
+        //  Taskset specifies which I/O threads are eligible (0 = all).
+        class io_thread_t *choose_io_thread (uint64_t taskset_);
+
+        //  All pipes are registered with the context so that even the
+        //  orphaned pipes can be deallocated on the terminal shutdown.
+        void register_pipe (class pipe_t *pipe_);
+        void unregister_pipe (class pipe_t *pipe_);
+
+        //  Management of inproc endpoints.
+        int register_endpoint (const char *addr_, class socket_base_t *socket_);
+        void unregister_endpoints (class socket_base_t *socket_);
+        class socket_base_t *find_endpoint (const char *addr_);
+
+    private:
+
+        ~ctx_t ();
+
+        struct app_thread_info_t
+        {
+            //  If false, 0MQ application thread is free, there's no associated
+            //  OS thread.
+            bool associated;
+
+            //  ID of the associated OS thread. If 'associated' is false,
+            //  this field contains bogus data.
+            thread_t::id_t tid;
+
+            //  Pointer to the 0MQ application thread object.
+            class app_thread_t *app_thread;
+        };
+
+        //  Application threads.
+        typedef std::vector <app_thread_info_t> app_threads_t;
+        app_threads_t app_threads;
+
+        //  Synchronisation of accesses to shared application thread data.
+        mutex_t app_threads_sync;
+
+        //  I/O threads.
+        typedef std::vector <class io_thread_t*> io_threads_t;
+        io_threads_t io_threads;
+
+        //  Array of pointers to signalers for both application and I/O threads.
+        int signalers_count;
+        signaler_t **signalers;
+
+        //  As pipes may reside in orphaned state in particular moments
+        //  of the pipe shutdown process, i.e. neither pipe reader nor
+        //  pipe writer hold reference to the pipe, we have to hold references
+        //  to all pipes in context so that we can deallocate them
+        //  during terminal shutdown even though it conincides with the
+        //  pipe being in the orphaned state.
+        typedef std::set <class pipe_t*> pipes_t;
+        pipes_t pipes;
+
+        //  Synchronisation of access to the pipes repository.
+        mutex_t pipes_sync;
+
+        //  Number of sockets alive.
+        int sockets;
+
+        //  If true, zmq_term was already called. When last socket is closed
+        //  the whole 0MQ infrastructure should be deallocated.
+        bool terminated;
+
+        //  Synchronisation of access to the termination data (socket count
+        //  and 'terminated' flag).
+        mutex_t term_sync;
+
+        //  List of inproc endpoints within this context.
+        typedef std::map <std::string, class socket_base_t*> endpoints_t;
+        endpoints_t endpoints;
+
+        //  Synchronisation of access to the list of inproc endpoints.
+        mutex_t endpoints_sync;
+
+        ctx_t (const ctx_t&);
+        void operator = (const ctx_t&);
+    };
+    
+}
+
+#endif
+

include/decoder.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DECODER_HPP_INCLUDED__
+#define __ZMQ_DECODER_HPP_INCLUDED__
+
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <algorithm>
+
+#include "err.hpp"
+
+namespace zmq
+{
+
+    //  Helper base class for decoders that know the amount of data to read
+    //  in advance at any moment. Knowing the amount in advance is a property
+    //  of the protocol used. Both AMQP and backend protocol are based on
+    //  size-prefixed paradigm, therefore they are using decoder_t to parse
+    //  the messages. On the other hand, XML-based transports (like XMPP or
+    //  SOAP) don't allow for knowing the size of data to read in advance and
+    //  should use different decoding algorithms.
+    //
+    //  Decoder implements the state machine that parses the incoming buffer.
+    //  Derived class should implement individual state machine actions.
+
+    template <typename T> class decoder_t
+    {
+    public:
+
+        inline decoder_t (size_t bufsize_) :
+            read_pos (NULL),
+            to_read (0),
+            next (NULL),
+            bufsize (bufsize_)
+        {
+            buf = (unsigned char*) malloc (bufsize_);
+            zmq_assert (buf);
+        }
+
+        //  The destructor doesn't have to be virtual. It is mad virtual
+        //  just to keep ICC and code checking tools from complaining.
+        inline virtual ~decoder_t ()
+        {
+            free (buf);
+        }
+
+        //  Returns a buffer to be filled with binary data.
+        inline void get_buffer (unsigned char **data_, size_t *size_)
+        {
+            //  If we are expected to read large message, we'll opt for zero-
+            //  copy, i.e. we'll ask caller to fill the data directly to the
+            //  message. Note that subsequent read(s) are non-blocking, thus
+            //  each single read reads at most SO_RCVBUF bytes at once not
+            //  depending on how large is the chunk returned from here.
+            //  As a consequence, large messages being received won't block
+            //  other engines running in the same I/O thread for excessive
+            //  amounts of time.
+            if (to_read >= bufsize) {
+                *data_ = read_pos;
+                *size_ = to_read;
+                return;
+            }
+
+            *data_ = buf;
+            *size_ = bufsize;
+        }
+
+        //  Processes the data in the buffer previously allocated using
+        //  get_buffer function. size_ argument specifies nemuber of bytes
+        //  actually filled into the buffer. Function returns number of
+        //  bytes actually processed.
+        inline size_t process_buffer (unsigned char *data_, size_t size_)
+        {
+            //  In case of zero-copy simply adjust the pointers, no copying
+            //  is required. Also, run the state machine in case all the data
+            //  were processed.
+            if (data_ == read_pos) {
+                read_pos += size_;
+                to_read -= size_;
+
+                while (!to_read)
+                    if (!(static_cast <T*> (this)->*next) ())
+                        return size_;
+                return size_;
+            }
+
+            size_t pos = 0;
+            while (true) {
+
+                //  Try to get more space in the message to fill in.
+                //  If none is available, return.
+                while (!to_read)
+                    if (!(static_cast <T*> (this)->*next) ())
+                        return pos;
+
+                //  If there are no more data in the buffer, return.
+                if (pos == size_)
+                    return pos;
+
+                //  Copy the data from buffer to the message.
+                size_t to_copy = std::min (to_read, size_ - pos);
+                memcpy (read_pos, data_ + pos, to_copy);
+                read_pos += to_copy;
+                pos += to_copy;
+                to_read -= to_copy;
+            }
+        }
+
+    protected:
+
+        //  Prototype of state machine action. Action should return false if
+        //  it is unable to push the data to the system.
+        typedef bool (T::*step_t) ();
+
+        //  This function should be called from derived class to read data
+        //  from the buffer and schedule next state machine action.
+        inline void next_step (void *read_pos_, size_t to_read_,
+            step_t next_)
+        {
+            read_pos = (unsigned char*) read_pos_;
+            to_read = to_read_;
+            next = next_;
+        }
+
+    private:
+
+        unsigned char *read_pos;
+        size_t to_read;
+        step_t next;
+
+        size_t bufsize;
+        unsigned char *buf;
+
+        decoder_t (const decoder_t&);
+        void operator = (const decoder_t&);
+    };
+
+}
+
+#endif

include/devpoll.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DEVPOLL_HPP_INCLUDED__
+#define __ZMQ_DEVPOLL_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#if defined ZMQ_HAVE_SOLARIS || ZMQ_HAVE_HPUX
+
+#include <vector>
+
+#include "fd.hpp"
+#include "thread.hpp"
+#include "atomic_counter.hpp"
+
+namespace zmq
+{
+
+    //  Implements socket polling mechanism using the Solaris-specific
+    //  "/dev/poll" interface.
+
+    class devpoll_t
+    {
+    public:
+
+        typedef fd_t handle_t;
+
+        devpoll_t ();
+        ~devpoll_t ();
+
+        //  "poller" concept.
+        handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
+        void rm_fd (handle_t handle_);
+        void set_pollin (handle_t handle_);
+        void reset_pollin (handle_t handle_);
+        void set_pollout (handle_t handle_);
+        void reset_pollout (handle_t handle_);
+        void add_timer (struct i_poll_events *events_);
+        void cancel_timer (struct i_poll_events *events_);
+        int get_load ();
+        void start ();
+        void stop ();
+
+    private:
+
+        //  Main worker thread routine.
+        static void worker_routine (void *arg_);
+
+        //  Main event loop.
+        void loop ();
+
+        //  File descriptor referring to "/dev/poll" pseudo-device.
+        fd_t devpoll_fd;
+
+        struct fd_entry_t
+        {
+            short events;
+            struct i_poll_events *reactor;
+            bool valid;
+            bool accepted;
+        };
+
+        std::vector <fd_entry_t> fd_table;
+
+        typedef std::vector <fd_t> pending_list_t;
+        pending_list_t pending_list;
+
+        //  Pollset manipulation function.
+        void devpoll_ctl (fd_t fd_, short events_);
+
+        //  List of all the engines waiting for the timer event.
+        typedef std::vector <struct i_poll_events*> timers_t;
+        timers_t timers;
+
+        //  If true, thread is in the process of shutting down.
+        bool stopping;
+
+        //  Handle of the physical thread doing the I/O work.
+        thread_t worker;
+
+        //  Load of the poller. Currently number of file descriptors
+        //  registered with the poller.
+        atomic_counter_t load;
+
+        devpoll_t (const devpoll_t&);
+        void operator = (const devpoll_t&);
+    };
+
+}
+
+#endif
+
+#endif

include/encoder.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ENCODER_HPP_INCLUDED__
+#define __ZMQ_ENCODER_HPP_INCLUDED__
+
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#endif
+
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <algorithm>
+
+#include "err.hpp"
+
+namespace zmq
+{
+
+    //  Helper base class for encoders. It implements the state machine that
+    //  fills the outgoing buffer. Derived classes should implement individual
+    //  state machine actions.
+
+    template <typename T> class encoder_t
+    {
+    public:
+
+        inline encoder_t (size_t bufsize_) :
+            bufsize (bufsize_)
+        {
+            buf = (unsigned char*) malloc (bufsize_);
+            zmq_assert (buf);
+        }
+
+        //  The destructor doesn't have to be virtual. It is mad virtual
+        //  just to keep ICC and code checking tools from complaining.
+        inline virtual ~encoder_t ()
+        {
+            free (buf);
+        }
+
+        //  The function returns a batch of binary data. The data
+        //  are filled to a supplied buffer. If no buffer is supplied (data_
+        //  points to NULL) decoder object will provide buffer of its own.
+        //  If offset is not NULL, it is filled by offset of the first message
+        //  in the batch.If there's no beginning of a message in the batch,
+        //  offset is set to -1.
+        inline void get_data (unsigned char **data_, size_t *size_,
+            int *offset_ = NULL)
+        {
+            unsigned char *buffer = !*data_ ? buf : *data_;
+            size_t buffersize = !*data_ ? bufsize : *size_;
+
+            size_t pos = 0;
+            if (offset_)
+                *offset_ = -1;
+
+            while (true) {
+
+                //  If there are no more data to return, run the state machine.
+                //  If there are still no data, return what we already have
+                //  in the buffer.
+                if (!to_write) {
+                    if (!(static_cast <T*> (this)->*next) ()) {
+                        *data_ = buffer;
+                        *size_ = pos;
+                        return;
+                    }
+
+                    //  If beginning of the message was processed, adjust the
+                    //  first-message-offset.
+                    if (beginning) { 
+                        if (offset_ && *offset_ == -1)
+                            *offset_ = pos;
+                        beginning = false;
+                    }
+                }
+
+                //  If there are no data in the buffer yet and we are able to
+                //  fill whole buffer in a single go, let's use zero-copy.
+                //  There's no disadvantage to it as we cannot stuck multiple
+                //  messages into the buffer anyway. Note that subsequent
+                //  write(s) are non-blocking, thus each single write writes
+                //  at most SO_SNDBUF bytes at once not depending on how large
+                //  is the chunk returned from here.
+                //  As a consequence, large messages being sent won't block
+                //  other engines running in the same I/O thread for excessive
+                //  amounts of time.
+                if (!pos && !*data_ && to_write >= buffersize) {
+                    *data_ = write_pos;
+                    *size_ = to_write;
+                    write_pos = NULL;
+                    to_write = 0;
+                    return;
+                }
+
+                //  Copy data to the buffer. If the buffer is full, return.
+                size_t to_copy = std::min (to_write, buffersize - pos);
+                memcpy (buffer + pos, write_pos, to_copy);
+                pos += to_copy;
+                write_pos += to_copy;
+                to_write -= to_copy;
+                if (pos == buffersize) {
+                    *data_ = buffer;
+                    *size_ = pos;
+                    return;
+                }
+            }
+        }
+
+    protected:
+
+        //  Prototype of state machine action.
+        typedef bool (T::*step_t) ();
+
+        //  This function should be called from derived class to write the data
+        //  to the buffer and schedule next state machine action. Set beginning
+        //  to true when you are writing first byte of a message.
+        inline void next_step (void *write_pos_, size_t to_write_,
+            step_t next_, bool beginning_)
+        {
+            write_pos = (unsigned char*) write_pos_;
+            to_write = to_write_;
+            next = next_;
+            beginning = beginning_;
+        }
+
+    private:
+
+        unsigned char *write_pos;
+        size_t to_write;
+        step_t next;
+        bool beginning;
+
+        size_t bufsize;
+        unsigned char *buf;
+
+        encoder_t (const encoder_t&);
+        void operator = (const encoder_t&);
+    };
+
+}
+
+#endif

include/epoll.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_EPOLL_HPP_INCLUDED__
+#define __ZMQ_EPOLL_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#ifdef ZMQ_HAVE_LINUX
+
+#include <vector>
+#include <sys/epoll.h>
+
+#include "fd.hpp"
+#include "thread.hpp"
+#include "atomic_counter.hpp"
+
+namespace zmq
+{
+
+    //  This class implements socket polling mechanism using the Linux-specific
+    //  epoll mechanism.
+
+    class epoll_t
+    {
+    public:
+
+        typedef void* handle_t;
+
+        epoll_t ();
+        ~epoll_t ();
+
+        //  "poller" concept.
+        handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
+        void rm_fd (handle_t handle_);
+        void set_pollin (handle_t handle_);
+        void reset_pollin (handle_t handle_);
+        void set_pollout (handle_t handle_);
+        void reset_pollout (handle_t handle_);
+        void add_timer (struct i_poll_events *events_);
+        void cancel_timer (struct i_poll_events *events_);
+        int get_load ();
+        void start ();
+        void stop ();
+
+    private:
+
+        //  Main worker thread routine.
+        static void worker_routine (void *arg_);
+
+        //  Main event loop.
+        void loop ();
+
+        //  Main epoll file descriptor
+        fd_t epoll_fd;
+
+        struct poll_entry_t
+        {
+            fd_t fd;
+            epoll_event ev;
+            struct i_poll_events *events;
+        };
+
+        //  List of retired event sources.
+        typedef std::vector <poll_entry_t*> retired_t;
+        retired_t retired;
+
+        //  List of all the engines waiting for the timer event.
+        typedef std::vector <struct i_poll_events*> timers_t;
+        timers_t timers;
+
+        //  If true, thread is in the process of shutting down.
+        bool stopping;
+
+        //  Handle of the physical thread doing the I/O work.
+        thread_t worker;
+
+        //  Load of the poller. Currently number of file descriptors
+        //  registered with the poller.
+        atomic_counter_t load;
+
+        epoll_t (const epoll_t&);
+        void operator = (const epoll_t&);
+    };
+
+}
+
+#endif
+
+#endif
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ERR_HPP_INCLUDED__
+#define __ZMQ_ERR_HPP_INCLUDED__
+
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "platform.hpp"
+#include "likely.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <netdb.h>
+#endif
+
+#ifdef ZMQ_HAVE_WINDOWS
+
+namespace zmq
+{
+
+    const char *wsa_error ();
+    void win_error (char *buffer_, size_t buffer_size_);
+    void wsa_error_to_errno ();
+  
+}
+
+//  Provides convenient way to check WSA-style errors on Windows.
+#define wsa_assert(x) \
+    do {\
+        if (unlikely (!(x))) {\
+            const char *errstr = zmq::wsa_error ();\
+            if (errstr != NULL) {\
+                fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \
+                    __FILE__, __LINE__);\
+                abort ();\
+            }\
+        }\
+    } while (false)
+
+// Provides convenient way to check GetLastError-style errors on Windows.
+#define win_assert(x) \
+    do {\
+        if (unlikely (!(x))) {\
+            char errstr [256];\
+            zmq::win_error (errstr, 256);\
+            fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \
+                __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false)
+
+#endif
+
+//  This macro works in exactly the same way as the normal assert. It is used
+//  in its stead because standard assert on Win32 in broken - it prints nothing
+//  when used within the scope of JNI library.
+#define zmq_assert(x) \
+    do {\
+        if (unlikely (!(x))) {\
+            fprintf (stderr, "Assertion failed: %s (%s:%d)\n", #x, \
+                __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false) 
+
+//  Provides convenient way to check for errno-style errors.
+#define errno_assert(x) \
+    do {\
+        if (unlikely (!(x))) {\
+            perror (NULL);\
+            fprintf (stderr, "%s (%s:%d)\n", #x, __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false)
+
+// Provides convenient way to check for POSIX errors.
+#define posix_assert(x) \
+    do {\
+        if (unlikely (x)) {\
+            fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false)
+
+// Provides convenient way to check for errors from getaddrinfo.
+#define gai_assert(x) \
+    do {\
+        if (unlikely (x)) {\
+            const char *errstr = gai_strerror (x);\
+            fprintf (stderr, "%s (%s:%d)\n", errstr, __FILE__, __LINE__);\
+            abort ();\
+        }\
+    } while (false)
+
+#endif
+
+#define zmq_not_implemented() \
+    do {\
+        fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\
+        abort ();\
+    } while (false)
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FD_HPP_INCLUDED__
+#define __ZMQ_FD_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#endif
+
+namespace zmq
+{
+#ifdef ZMQ_HAVE_WINDOWS
+#if defined _MSC_VER &&_MSC_VER <= 1400
+    typedef UINT_PTR fd_t;
+    enum {retired_fd = (fd_t)(~0)};
+#else
+    typedef SOCKET fd_t;
+    enum {retired_fd = INVALID_SOCKET};
+#endif
+#else
+    typedef int fd_t;
+    enum {retired_fd = -1};
+#endif
+}
+#endif

include/forwarder.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__
+#define __ZMQ_FORWARDER_HPP_INCLUDED__
+
+namespace zmq
+{
+
+    int forwarder (class socket_base_t *insocket_,
+        class socket_base_t *outsocket_);
+
+}
+
+#endif
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FQ_HPP_INCLUDED__
+#define __ZMQ_FQ_HPP_INCLUDED__
+
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+    //  Class manages a set of inbound pipes. On receive it performs fair
+    //  queueing (RFC970) so that senders gone berserk won't cause denial of
+    //  service for decent senders.
+    class fq_t
+    {
+    public:
+
+        fq_t ();
+        ~fq_t ();
+
+        void attach (class reader_t *pipe_);
+        void detach (class reader_t *pipe_);
+        void kill (class reader_t *pipe_);
+        void revive (class reader_t *pipe_);
+        int recv (zmq_msg_t *msg_, int flags_);
+        bool has_in ();
+
+    private:
+
+        //  Inbound pipes.
+        typedef yarray_t <class reader_t> pipes_t;
+        pipes_t pipes;
+
+        //  Number of active pipes. All the active pipes are located at the
+        //  beginning of the pipes array.
+        pipes_t::size_type active;
+
+        //  Index of the next bound pipe to read a message from.
+        pipes_t::size_type current;
+
+        //  If true, part of a multipart message was already received, but
+        //  there are following parts still waiting in the current pipe.
+        bool more;
+
+        fq_t (const fq_t&);
+        void operator = (const fq_t&);
+    };
+
+}
+
+#endif

include/i_endpoint.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+
+#include "blob.hpp"
+
+namespace zmq
+{
+
+    struct i_endpoint
+    {
+        virtual ~i_endpoint () {}
+
+        virtual void attach_pipes (class reader_t *inpipe_,
+            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
+        virtual void detach_inpipe (class reader_t *pipe_) = 0;
+        virtual void detach_outpipe (class writer_t *pipe_) = 0;
+        virtual void kill (class reader_t *pipe_) = 0;
+        virtual void revive (class reader_t *pipe_) = 0;
+        virtual void revive (class writer_t *pipe_) = 0;
+    };
+
+}
+
+#endif

include/i_engine.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
+#define __ZMQ_I_ENGINE_HPP_INCLUDED__
+
+#include <stddef.h>
+
+namespace zmq
+{
+
+    struct i_engine
+    {
+        virtual ~i_engine () {}
+
+        //  Plug the engine to the session.
+        virtual void plug (struct i_inout *inout_) = 0;
+
+        //  Unplug the engine from the session.
+        virtual void unplug () = 0;
+
+        //  This method is called by the session to signalise that there
+        //  are messages to send available.
+        virtual void revive () = 0;
+
+        //  This method is called by the session to signalise that more
+        //  messages can be written to the pipe.
+        virtual void resume_input () = 0;
+    };
+
+}
+
+#endif

include/i_inout.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
+#define __ZMQ_I_INOUT_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+    struct i_inout
+    {
+        virtual ~i_inout () {}
+
+        //  Engine asks to get a message to send to the network.
+        virtual bool read (::zmq_msg_t *msg_) = 0;
+
+        //  Engine sends the incoming message further on downstream.
+        virtual bool write (::zmq_msg_t *msg_) = 0;
+
+        //  Flush all the previously written messages downstream.
+        virtual void flush () = 0;
+    
+        //  Drop all the references to the engine. The parameter is the object
+        //  to use to reconnect. If reconnection is not required, the argument
+        //  is set to NULL.
+        virtual void detach (class owned_t *reconnecter_) = 0;
+
+        //  Returns least loaded I/O thread.
+        virtual class io_thread_t *get_io_thread () = 0;
+
+        //  Return pointer to the owning socket.
+        virtual class socket_base_t *get_owner () = 0;
+
+        //  Return ordinal number of the session.
+        virtual uint64_t get_ordinal () = 0;
+    };
+
+}
+
+#endif

include/i_poll_events.hpp

+/*
+Copyright (c) 2007-2010 iMatix Corporation
+ 
+This file is part of 0MQ.
+ 
+0MQ is free software; you can redistribute it and/or modify it under
+the terms of the Lesser GNU General Public License as published by
+the Free Software Foundation; either version 3 of the License, or
+(at your option) any later version.
+ 
+0MQ is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+Lesser GNU General Public License for more details.
+ 
+You should have received a copy of the Lesser GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+ 
+#ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
+#define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
+ 
+namespace zmq
+{
+ 
+    // Virtual interface to be exposed by object that want to be notified
+    // about events on file descriptors.
+ 
+    struct i_poll_events
+    {
+        virtual ~i_poll_events () {}
+ 
+        // Called by I/O thread when file descriptor is ready for reading.
+        virtual void in_event () = 0;
+ 
+        // Called by I/O thread when file descriptor is ready for writing.
+        virtual void out_event () = 0;
+ 
+        // Called when timer expires.
+        virtual void timer_event () = 0;
+    };
+ 
+}
+ 
+#endif

include/io_object.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
+#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
+
+#include <stddef.h>
+
+#include "stdint.hpp"
+#include "poller.hpp"
+#include "i_poll_events.hpp"
+
+namespace zmq
+{
+
+    //  Simple base class for objects that live in I/O threads.
+    //  It makes communication with the poller object easier and
+    //  makes defining unneeded event handlers unnecessary.
+
+    class io_object_t : public i_poll_events
+    {
+    public:
+
+        io_object_t (class io_thread_t *io_thread_ = NULL);
+        ~io_object_t ();
+
+    protected:
+
+        typedef poller_t::handle_t handle_t;
+
+        //  Derived class can init/swap the underlying I/O thread.
+        //  Caution: Remove all the file descriptors from the old I/O thread
+        //  before swapping to the new one!
+        void set_io_thread (class io_thread_t *io_thread_);
+
+        //  Methods to access underlying poller object.
+        handle_t add_fd (fd_t fd_);
+        void rm_fd (handle_t handle_);
+        void set_pollin (handle_t handle_);
+        void reset_pollin (handle_t handle_);
+        void set_pollout (handle_t handle_);
+        void reset_pollout (handle_t handle_);
+        void add_timer ();
+        void cancel_timer ();
+
+        //  i_poll_events interface implementation.
+        void in_event ();
+        void out_event ();
+        void timer_event ();
+
+    private:
+
+        poller_t *poller;
+
+        io_object_t (const io_object_t&);
+        void operator = (const io_object_t&);
+    };
+
+}
+
+#endif

include/io_thread.hpp

+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_IO_THREAD_HPP_INCLUDED__
+#define __ZMQ_IO_THREAD_HPP_INCLUDED__
+
+#include <vector>
+
+#include "stdint.hpp"
+#include "object.hpp"
+#include "poller.hpp"
+#include "i_poll_events.hpp"
+#include "signaler.hpp"
+
+namespace zmq
+{
+
+    //  Generic part of the I/O thread. Polling-mechanism-specific features
+    //  are implemented in separate "polling objects".
+
+    class io_thread_t : public object_t, public i_poll_events
+    {
+    public:
+
+        io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
+
+        //  Clean-up. If the thread was started, it's neccessary to call 'stop'
+        //  before invoking destructor. Otherwise the destructor would hang up.
+        ~io_thread_t ();
+
+        //  Launch the physical thread.
+        void start ();
+
+        //  Ask underlying thread to stop.
+        void stop ();
+
+        //  Returns signaler associated with this I/O thread.
+        signaler_t *get_signaler ();
+
+        //  i_poll_events implementation.
+        void in_event ();
+        void out_event ();
+        void timer_event ();
+
+        //  Used by io_objects to retrieve the assciated poller object.
+        poller_t *get_poller ();
+
+        //  Command handlers.
+        void process_stop ();
+
+        //  Returns load experienced by the I/O thread.
+        int get_load ();
+
+    private:
+
+        //  Poll thread gets notifications about incoming commands using
+        //  this signaler.
+        signaler_t signaler;
+
+        //  Handle associated with signaler's file descriptor.
+        poller_t::handle_t signaler_handle;
+
+        //  I/O multiplexing is performed using a poller object.
+        poller_t *poller;
+    };
+
+}
+
+#endif
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_IP_HPP_INCLUDED__
+#define __ZMQ_IP_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#endif
+
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+#include <sys/un.h>
+#endif
+
+//  Some platforms (notably Darwin/OSX and NetBSD) do not define all AI_
+//  flags for getaddrinfo(). This can be worked around safely by defining
+//  these to 0.
+#ifndef AI_ADDRCONFIG
+#define AI_ADDRCONFIG 0
+#endif
+#ifndef AI_NUMERICSERV
+#define AI_NUMERICSERV 0
+#endif
+
+namespace zmq
+{