Rhys ! avatar Rhys ! committed 49f8ab7 Draft

first commit

Comments (0)

Files changed (9)

+.idea
+target/
+ZPirate.iml
+dependency-reduced-pom.xml
+module_zpirate.xml
+zpirate.properties
+zpirate.xml
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.sonatype.oss</groupId>
+		<artifactId>oss-parent</artifactId>
+		<version>7</version>
+	</parent>
+
+	<groupId>org.lyrcl.app.pirate</groupId>
+	<artifactId>zpirate</artifactId>
+	<version>0.1.0-SNAPSHOT</version>
+	<packaging>jar</packaging>
+	<name>zpirate</name>
+
+	<description>
+	Implementation of the Paranoid Pirate Protocol for ZeroMQ 2.2.
+    </description>
+
+	<scm>
+		<connection>scm:hg:https://bitbucket.org/rimmington/zpirate</connection>
+		<url>https://bitbucket.org/rimmington/zpirate</url>
+	</scm>
+
+	<licenses>
+		<license>
+			<name>GNU GPL 3</name>
+			<url>http://www.gnu.org/licenses/gpl.txt</url>
+		</license>
+	</licenses>
+
+	<developers>
+		<developer>
+			<id>rimmington</id>
+			<name>Rhys !</name>
+			<email>rimmington@gmail.com</email>
+		</developer>
+	</developers>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<maven.compiler.source>1.6</maven.compiler.source>
+		<maven.compiler.target>1.6</maven.compiler.target>
+
+		<jzmq.version>1.1.0-SNAPSHOT</jzmq.version>
+		<native.os>${os.name}</native.os>
+		<native.arch>${os.arch}</native.arch>
+	</properties>
+
+	<profiles>
+		<profile>
+			<id>Windows</id>
+			<activation>
+				<os>
+					<family>windows</family>
+				</os>
+			</activation>
+			<properties>
+				<native.os>Windows</native.os>
+			</properties>
+		</profile>
+	</profiles>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.zeromq</groupId>
+			<artifactId>jzmq</artifactId>
+			<version>${jzmq.version}</version>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.zeromq</groupId>
+			<artifactId>jzmq</artifactId>
+			<version>${jzmq.version}</version>
+			<classifier>native-${native.arch}-${native.os}</classifier>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>jzmq-mv</id>
+			<url>https://bitbucket.org/rimmington/jzmq-mv/raw/default/snapshots</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<sourceDirectory>src</sourceDirectory>
+
+		<plugins>
+
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>buildnumber-maven-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>validate</phase>
+						<goals>
+							<goal>create</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<doCheck>false</doCheck>
+					<doUpdate>false</doUpdate>
+					<timestampPropertyName>buildTimestamp</timestampPropertyName>
+					<timestampFormat>{0,date,yyyy-MM-dd HH:mm Z}</timestampFormat>
+				</configuration>
+			</plugin>
+      
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<inherited>true</inherited>
+				<configuration>
+					<archive>                   
+						<manifest>
+							<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+						</manifest>
+						<manifestEntries>
+							<Implementation-Version>${buildNumber} ${buildTimestamp}</Implementation-Version>
+						</manifestEntries>
+					</archive>
+				</configuration>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<manifestEntries>
+										<Main-Class>org.lyrcl.lib.pirate.Test</Main-Class>
+									</manifestEntries>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+      		<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-javadoc-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-javadoc</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+		</plugins>
+
+	</build>
+
+</project>

src/org/lyrcl/lib/pirate/PSocket.java

+package org.lyrcl.lib.pirate;
+
+import java.nio.charset.Charset;
+
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+/**
+ * ZeroMQ 2.2 socket convenience class.
+ */
+public class PSocket {
+    /**
+     * The charset to use for string (un)subscriptions.
+     */
+	private static final Charset UTF8 = Charset.forName("UTF8");
+
+    /**
+     * The underlying ZMQ socket.
+     */
+	private final ZMQ.Socket socket;
+
+    /**
+     * The ZMQ context of the underlying socket.
+     */
+    private final ZMQ.Context context;
+
+    /**
+     * The timeout to receive a message.
+     */
+    private int recvTimeout = -1;
+
+    /**
+     * The timeout to send a message.
+     */
+    private int sendTimeout = -1;
+
+    /**
+     * Initialise a new PSocket instance using the specified context with
+     * the specified type.
+     *
+     * @param context The ZMQ context of this socket.
+     * @param type The type of this socket.
+     */
+    public PSocket(ZMQ.Context context, int type) {
+    	this.context = context;
+    	this.socket = context.socket(type);
+    }
+
+    /**
+     * ZeroMQ 2.2 poller convenience class.
+     */
+    public static class Poller {
+        /**
+         * The underlying ZMQ poller.
+         */
+    	private final ZMQ.Poller poller;
+
+        /**
+         * Initialise a Poller instance that wraps the specified ZMQ.Poller.
+         *
+         * @param poller The ZMQ.Poller to wrap.
+         */
+    	private Poller(ZMQ.Poller poller) {
+    		this.poller = poller;
+    	}
+
+        /**
+         * Also listen for incoming messages on the specified PSocket.
+         *
+         * @param socket The additional socket to listen on.
+         * @return This (for chaining).
+         */
+    	public Poller with(PSocket socket) {
+    		this.poller.register(socket.socket, ZMQ.Poller.POLLIN);
+    		return this;
+    	}
+
+        /**
+         * Get the underlying ZMQ Poller.
+         *
+         * @return The underlying ZMQ Poller.
+         */
+    	public ZMQ.Poller poller() {
+    		return this.poller;
+    	}
+
+        /**
+         * Poll for incoming messages with no timeout.
+         *
+         * @return The number of sockets with new messages, or -1 if an error
+         *         occurred.
+         */
+        public long poll() {
+            return this.poll(-1);
+        }
+
+        /**
+         * Poll for incoming messages using the specified timeout.
+         *
+         * @param timeout The maximum time to wait, in milliseconds.
+         * @return The number of sockets with new messages, or -1 if an error
+         *         occurred.
+         */
+        public long poll(int timeout) {
+            long microTimeout = timeout == -1 ? -1 : ((long)timeout * 1000);
+            return this.poller.poll(microTimeout);
+        }
+    }
+
+    /**
+     * Send a one-part message containing the specified bytes.
+     *
+     * @param bytes The message contents.
+     * @return true if send was successful, false otherwise.
+     */
+    public Boolean send(byte[] bytes) {
+    	return this.send(bytes, -1);
+    }
+
+    /**
+     * Send a one-part message containing the specified bytes with the
+     * specified timeout.
+     *
+     * @param bytes The message contents.
+     * @param timeout Send timeout, in milliseconds.
+     * @return true if send was successful, false otherwise.
+     */
+    public Boolean send(byte[] bytes, int timeout) {
+        return this.send(bytes, timeout, true);
+    }
+
+    /**
+     * Send a message containing the specified bytes with the specified
+     * timeout.
+     *
+     * @param bytes The message contents.
+     * @param timeout Send timeout, in milliseconds.
+     * @param last true if this is a single-part message or the last frame of
+     *             a multi-part message, false otherwise.
+     * @return true if send was successful, false otherwise.
+     */
+    public Boolean send(byte[] bytes, int timeout, boolean last) {
+        int flags = last ? 0 : ZMQ.SNDMORE;
+    	this.setSendTimeoutIfNeeded(timeout);
+        return this.socket.send(bytes, flags);
+    }
+
+    /**
+     * Set the send timeout of the underlying socket if it is different to the
+     * current timeout.
+     *
+     * @param timeout The send timeout, in milliseconds.
+     */
+    private void setSendTimeoutIfNeeded(int timeout) {
+    	if (this.sendTimeout != timeout) {
+    		this.socket.setSendTimeOut(timeout);
+    		this.sendTimeout = timeout;
+    	}
+    }
+
+    /**
+     * Receive a single-part message.
+     *
+     * @return The contents of a single-part message, or null if interrupted.
+     */
+    public byte[] recv() {
+    	return this.recv(-1);
+    }
+
+    /**
+     * Receive a single-part message with the specified timeout.
+     *
+     * @param timeout The receive timeout, in milliseconds.
+     * @return The contents of a single-part message, or null if interrupted
+     *         or timed out.
+     */
+    public byte[] recv(int timeout) {
+    	this.setRecvTimeoutIfNeeded(timeout);
+    	return this.socket.recv(0);
+    }
+
+    /**
+     * Receive a multi-part message.
+     *
+     * @return The multi-part message, or null if interrupted.
+     */
+    public ZMsg recvMulti() {
+    	return this.recvMulti(-1);
+    }
+
+    /**
+     * Receive a multi-part message with the specified timeout.
+     *
+     * @param timeout The receive timeout, in milliseconds.
+     * @return The multi-part message, or null if interrupted or timed out.
+     */
+    public ZMsg recvMulti(int timeout) {
+    	this.setRecvTimeoutIfNeeded(timeout);
+    	return ZMsg.recvMsg(this.socket);
+    }
+
+    /**
+     * Set the receive timeout to the specified number of milliseconds.
+     *
+     * @param timeout The receive timeout in milliseconds.
+     */
+    private void setRecvTimeoutIfNeeded(int timeout) {
+    	if (this.recvTimeout != timeout) {
+    		this.socket.setReceiveTimeOut(timeout);
+    		this.recvTimeout = timeout;
+    	}
+    }
+
+    /**
+     * Bind to the specified endpoint.
+     *
+     * @param endpoint The endpoint to bind to.
+     * @return This (for chaining).
+     */
+    public PSocket bind(String endpoint) {
+        this.socket.bind(endpoint);
+        return this;
+    }
+
+    /**
+     * Connect to the specified endpoint.
+     *
+     * @param endpoint The endpoint to bind to.
+     * @return This (for chaining).
+     */
+    public PSocket connect(String endpoint) {
+        this.socket.connect(endpoint);
+        return this;
+    }
+
+    /**
+     * Subscribe to messages starting with the specified string prefix, encoded
+     * as ASCII/UTF-8.
+     *
+     * @param prefix The string prefix.
+     * @return This (for chaining).
+     */
+    public PSocket sub(String prefix) {
+    	return this.sub(prefix.getBytes(PSocket.UTF8));
+    }
+
+    /**
+     * Subscribe to all messages.
+     *
+     * @return This (for chaining).
+     */
+    public PSocket sub() {
+        return this.sub(new byte[0]);
+    }
+
+    /**
+     * Subscribe to messages starting with the specified bytes.
+     *
+     * @param prefix The byte prefix.
+     * @return This (for chaining).
+     */
+    public PSocket sub(byte[] prefix) {
+    	this.socket.subscribe(prefix);
+    	return this;
+    }
+
+    /**
+     * Remove a subscription to messages starting with the specified string
+     * prefix, encoded as ASCII/UTF-8.
+     *
+     * @param prefix The string prefix.
+     * @return This (for chaining).
+     */
+    public PSocket unsub(String prefix) {
+        return this.unsub(prefix.getBytes(PSocket.UTF8));
+    }
+
+    /**
+     * Remove a subscription to all messages.
+     *
+     * @return This (for chaining).
+     */
+    public PSocket unsub() {
+        return this.unsub(new byte[0]);
+    }
+
+    /**
+     * Remove a subscription to messages starting with the specified bytes.
+     *
+     * @param prefix The byte prefix.
+     * @return This (for chaining).
+     */
+    public PSocket unsub(byte[] prefix) {
+    	this.socket.unsubscribe(prefix);
+    	return this;
+    }
+
+    /**
+     * Drop all outgoing messages and connects and close this socket.
+     */
+    public void dropAndClose() {
+        this.socket.setLinger(0);
+        this.socket.close();
+    }
+
+    /**
+     * Close this socket, waiting for all outgoing messages and connects to
+     * complete.
+     */
+    public void close() {
+        this.socket.close();
+    }
+
+    /**
+     * Check for incoming messages detected by a previous poll.
+     *
+     * @return true if messages are waiting to be received, false otherwise.
+     */
+    public Boolean pollIn() {
+        return ((int)this.socket.getEvents() & ZMQ.Poller.POLLIN) > 0;
+    }
+
+    /**
+     * Create a poller that polls for incoming messages on this instance.
+     *
+     * @return A poller that listens for incoming messages on this instance.
+     */
+    public PSocket.Poller poller() {
+    	return new Poller(this.context.poller()).with(this);
+    }
+
+    /**
+     * Create a poller that polls for incoming messages on this instance and
+     * the specified socket.
+     *
+     * @param socket The other socket to listen on.
+     * @return A poller that polls for incoming messages on this instance and
+     *         the specified socket.
+     */
+    public PSocket.Poller with(PSocket socket) {
+    	return this.poller().with(socket);
+    }
+
+    /**
+     * Get the underlying ZMQ.Socket.
+     *
+     * @return The underlying ZMQ.Socket.
+     */
+    public ZMQ.Socket socket() {
+        return socket;
+    }
+}

src/org/lyrcl/lib/pirate/PirateClient.java

+package org.lyrcl.lib.pirate;
+
+import org.zeromq.ZMQ;
+
+/**
+ * ZeroMQ 2.2 paranoid pirate client API.
+ */
+public class PirateClient {
+    /**
+     * The maximum time to wait for a reply to each attempt, in milliseconds.
+     */
+	private final int timeout;
+
+    /**
+     * The number of times to try a request before giving up.
+     */
+	private final int retries;
+
+    /**
+     * The endpoint to communicate with the queue on.
+     */
+	private final String queueEndpoint;
+
+    /**
+     * This client's ZMQ context.
+     */
+    private final ZMQ.Context context;
+
+    /**
+     * The socket used to communicate with the queue.
+     */
+	private PSocket socket;
+
+    /**
+     * Initialise a new paranoid pirate client instance attached to the specified
+     * endpoint with the default timeout and performance settings.
+     *
+     * @param queueEndpoint The endpoint to communicate with the queue on.
+     */
+	public PirateClient(String queueEndpoint) {
+		this(queueEndpoint, 2500, 3, 1);
+	}
+
+    /**
+     * Initialise a new paranoid pirate client instance attached to the specified
+     * endpoint with the specified timeout and performance settings.
+     *
+     * @param queueEndpoint The endpoint to communicate with the queue on.
+     * @param timeout The maximum time to wait for a reply to each attempt,
+     *                in milliseconds.
+     * @param retries The number of times to try a request before giving up.
+     * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
+     *                  around 1 thread for each gigabyte/second of data.
+     */
+	public PirateClient(String queueEndpoint, int timeout, int retries, int ioThreads) {
+		this.timeout = timeout;
+		this.retries = retries;
+		this.queueEndpoint = queueEndpoint;
+
+		this.context = ZMQ.context(ioThreads);
+	}
+
+    /**
+     * Make a synchronous request.
+     *
+     * @param request The request message.
+     * @return The reply, or null for a timeout.
+     */
+	public byte[] req(byte[] request) {
+		for (int i = 0; i < this.retries; i++) {
+			this.connectIfNeeded();
+			this.socket.send(request, 0);
+            byte[] response = this.socket.recv(this.timeout);
+			if (response != null) {
+				return response;
+			} else {
+				this.socket.dropAndClose();
+				this.socket = null;
+			}
+		}
+
+		return null;
+	}
+
+    /**
+     * Finish and close communication with the queue.
+     */
+	public void close() {
+		if (this.socket != null) {
+			this.socket.close();
+		}
+
+		this.context.term();
+	}
+
+    /**
+     * Connect to the queue if not already.
+     */
+	private void connectIfNeeded() {
+		if (this.socket == null) {
+			this.socket = new PSocket(this.context, ZMQ.REQ);
+			this.socket.connect(this.queueEndpoint);
+		}
+	}
+}

src/org/lyrcl/lib/pirate/PirateProtocol.java

+package org.lyrcl.lib.pirate;
+
+/**
+ * Pirate protocol constants.
+ */
+final class PirateProtocol {
+    /**
+     * The bytes signifying a new worker is ready for work.
+     */
+	public static byte[] READY = new byte[] { 1 };
+
+    /**
+     * The bytes signifying an existing component is still alive.
+     */
+	public static byte[] HEARTBEAT = new byte[] { 2 };
+}

src/org/lyrcl/lib/pirate/PirateQueue.java

+package org.lyrcl.lib.pirate;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.logging.Logger;
+
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+/**
+ * ZeroMQ 2.2 paranoid pirate queue implementation.
+ */
+public class PirateQueue implements Runnable {
+    /**
+     * The logger for this class.
+     */
+    private static final Logger LOGGER =
+            Logger.getLogger(PirateQueue.class.getName());
+
+    /**
+     * The number of heartbeats to wait before declaring a worker dead.
+     */
+	private final int retries;
+
+    /**
+     * The time between heartbeats to and from workers, in nanoseconds.
+     */
+	private final int heartbeatInterval;
+
+    /**
+     * The endpoint to communicate with clients on.
+     */
+	private final String clientEndpoint;
+
+    /**
+     * The endpoint to communicate with workers on.
+     */
+	private final String workerEndpoint;
+
+    /**
+     * This queue's ZMQ context.
+     */
+	private final ZMQ.Context context;
+
+    /**
+     * A remote worker with an address and expiry.
+     */
+	private class Worker {
+        /**
+         * The address of the worker (used in message wrapping).
+         */
+		public byte[] address;
+
+        /**
+         * The time when the worker will be removed from the queue due to
+         * inactivity, in nanoseconds.
+         *
+         * @see System.nanoTime()
+         */
+		public long expiry;
+
+        /**
+         * Initialise a Worker instance with the specified address and
+         * an automatically calculated expiry.
+         *
+         * @param address The address of the worker.
+         */
+		public Worker(byte[] address) {
+			this.address = address;
+			this.expiry = System.currentTimeMillis() +
+                    PirateQueue.this.retries * PirateQueue.this.heartbeatInterval;
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(this.address);
+		}
+
+		@Override
+		public boolean equals(Object other) {
+			return other instanceof Worker &&
+				Arrays.equals(this.address,
+					((Worker) other).address);
+		}
+	}
+
+    /**
+     * A queue of live workers available for work.
+     */
+	private class WorkerQueue {
+        /**
+         * The available workers.
+         */
+		private final Deque<Worker> queue = new ArrayDeque<Worker>();
+
+        /**
+         * The next time to send out heartbeats, in nanoseconds.
+         *
+         * @see System.nanoTime()
+         */
+		private long heartbeatAt = System.nanoTime()
+                + PirateQueue.this.heartbeatInterval;
+
+        /**
+         * Add a worker available for work to the end of the queue.
+         *
+         * @param worker The worker.
+         */
+		public void ready(Worker worker) {
+			this.queue.remove(worker);
+			this.queue.addLast(worker);
+		}
+
+        /**
+         * Pop a worker from the head of the queue and return its address.
+         *
+         * @return The address of a worker available for work.
+         */
+		public byte[] next() {
+			return this.queue.pop().address;
+		}
+
+        /**
+         * Check if the queue is empty.
+         *
+         * @return true if no workers are available for work, false otherwise.
+         */
+		public boolean isEmpty() {
+			return this.queue.isEmpty();
+		}
+
+        /**
+         * Send out heartbeats to available workers and remove expired workers.
+         *
+         * @param workerSock The socket to send heartbeats to workers from.
+         */
+		public void maintain(PSocket workerSock) {
+			if (System.nanoTime() >= this.heartbeatAt) {
+				for (Worker w: this.queue) {
+                    workerSock.send(w.address, -1, false);
+					workerSock.send(PirateProtocol.HEARTBEAT, -1, true);
+				}
+
+				this.heartbeatAt = System.nanoTime() +
+					PirateQueue.this.heartbeatInterval;
+			}
+
+			long time = System.nanoTime();
+			Iterator<Worker> i = this.queue.iterator();
+			while (i.hasNext() && i.next().expiry < time) {
+				i.remove();
+			}
+		}
+	}
+
+    /**
+     * Initialise a new paranoid pirate queue instance attached to the specified
+     * endpoints with the default liveness and performance settings.
+     *
+     * @param clientEndpoint The endpoint to communicate with clients on.
+     * @param workerEndpoint The endpoint to communicate with workers on.
+     */
+	public PirateQueue(String clientEndpoint, String workerEndpoint) {
+		this(clientEndpoint, workerEndpoint, 1000, 3, 1);
+	}
+
+    /**
+     * Initialise a new paranoid pirate queue instance attached to the specified
+     * endpoints with the specified liveness and performance settings.
+     *
+     * @param clientEndpoint The endpoint to communicate with clients on.
+     * @param workerEndpoint The endpoint to communicate with workers on.
+     * @param heartbeatInterval The time between heartbeats to and from workers,
+     *                          in milliseconds.
+     * @param retries The number of heartbeats to wait before declaring a
+     *                worker dead.
+     * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
+     *                  around 1 thread for each gigabyte/second of data.
+     */
+	public PirateQueue(String clientEndpoint, String workerEndpoint,
+			int heartbeatInterval, int retries, int ioThreads) {
+		this.heartbeatInterval = heartbeatInterval * 1000000;
+		this.retries = retries;
+
+		this.context = ZMQ.context(ioThreads);
+		this.clientEndpoint = clientEndpoint;
+		this.workerEndpoint = workerEndpoint;
+	}
+
+    /**
+     * Run as a paranoid pirate queue until interrupted.
+     */
+    @Override
+	public void run() {
+		PSocket clientSock = new PSocket(this.context, ZMQ.ROUTER);
+		PSocket workerSock = new PSocket(this.context, ZMQ.ROUTER);
+		clientSock.bind(this.clientEndpoint);
+		workerSock.bind(this.workerEndpoint);
+
+		PSocket.Poller pollWorkers = workerSock.poller();
+        PSocket.Poller pollBoth = workerSock.with(clientSock);
+		WorkerQueue workers = new WorkerQueue();
+
+		while (!Thread.currentThread().isInterrupted()) {
+            // No point polling for clients if there's no workers
+            (workers.isEmpty() ? pollWorkers : pollBoth).poll();
+
+			if (workerSock.pollIn()) {
+				ZMsg msg = workerSock.recvMulti();
+				if (msg == null) {
+					break; // interrupted
+				}
+
+				ZFrame address = msg.unwrap();
+				workers.ready(new Worker(address.getData()));
+
+				if (msg.size() == 1) { // control message
+					ZFrame frame = msg.getFirst();
+					if (!Arrays.equals(PirateProtocol.HEARTBEAT, frame.getData()) &&
+							!Arrays.equals(PirateProtocol.READY, frame.getData())) {
+                        LOGGER.warning(
+                                "Invalid control message received from worker: "
+                                        + msg.toString());
+					}
+				} else {
+					msg.send(clientSock.socket());
+				}
+			}
+
+			if (clientSock.pollIn()) {
+				ZMsg msg = clientSock.recvMulti();
+				if (msg == null) {
+					break; // interrupted
+				} else {
+					msg.push(workers.next());
+					msg.send(workerSock.socket());
+				}
+			}
+
+			workers.maintain(workerSock);
+		}
+
+		this.context.term();
+	}
+}

src/org/lyrcl/lib/pirate/PirateWorker.java

+package org.lyrcl.lib.pirate;
+
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+/**
+ * ZeroMQ 2.2 paranoid pirate worker API.
+ */
+public class PirateWorker {
+    /**
+     * The logger for this class.
+     */
+    private static final Logger LOGGER =
+            Logger.getLogger(PirateWorker.class.getName());
+
+    /**
+     * This worker's ZMQ context.
+     */
+	private final ZMQ.Context context;
+
+    /**
+     * The endpoint to communicate with the queue on.
+     */
+	private final String queueEndpoint;
+
+    /**
+     * The time between heartbeats to and from the queue, in milliseconds.
+     */
+	private final int heartbeatInterval;
+
+    /**
+     * The time to wait for the queue to restart once it stops responding to
+     * heartbeats, in milliseconds.
+     */
+	private final int reconnectInterval;
+
+    /**
+     * The number of heartbeats to wait before declaring the queue dead.
+     */
+	private final int retries;
+
+    /**
+     * The socket used to communicate with the queue.
+     */
+	private PSocket queue;
+
+    /**
+     * A frame containing the address of the client the last request
+     * was received from.
+     */
+	private ZFrame replyTo;
+
+    /**
+     * The next time to send out a heartbeat, in nanoseconds.
+     *
+     * @see System.nanoTime()
+     */
+	private long heartbeatAt;
+
+    /**
+     * Initialise a new paranoid pirate worker instance attached to the
+     * specified endpoint with the default liveness and performance settings.
+     *
+     * @param queueEndpoint The endpoint to communicate with the queue on.
+     */
+	public PirateWorker(String queueEndpoint) {
+		this(queueEndpoint, 1000, 3, 5000, 1);
+	}
+
+    /**
+     * Initialise a new paranoid pirate queue instance attached to the specified
+     * endpoints with the specified liveness and performance settings.
+     *
+     * @param queueEndpoint The endpoint to communicate with the queue on.
+     * @param heartbeatInterval The time between heartbeats to and from the
+     *                          queue, in milliseconds.
+     * @param retries The number of heartbeats to wait before declaring the
+     *                queue dead.
+     * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
+     *                  around 1 thread for each gigabyte/second of data.
+     */
+	public PirateWorker(String queueEndpoint, int heartbeatInterval,
+			int retries, int reconnectInterval, int ioThreads) {
+		this.queueEndpoint = queueEndpoint;
+		this.heartbeatInterval = heartbeatInterval;
+		this.reconnectInterval = reconnectInterval;
+		this.retries = retries;
+
+		this.context = ZMQ.context(ioThreads);
+		this.connect();
+	}
+
+    /**
+     * Respond to the last request with the specified message and wait for a
+     * new request.
+     *
+     * @param reply The reply message. Can only be null for the initial call.
+     * @return The next request, or null if interrupted.
+     */
+	public byte[] recv(byte[] reply) {
+		if (this.replyTo != null && reply == null) {
+			throw new IllegalArgumentException(
+					"reply cannot be null for non-initial recv call.");
+		}
+
+		if (reply != null) {
+            ZMsg msg = new ZMsg();
+            msg.add(new ZFrame(reply));
+
+			msg.wrap(this.replyTo);
+			msg.send(this.queue.socket(), true);
+			this.replyTo = null;
+		}
+
+		PSocket.Poller poller = this.queue.poller();
+		int retriesLeft = this.retries;
+		while (!Thread.currentThread().isInterrupted()) {
+			if (poller.poll(this.heartbeatInterval) == -1) {
+                break; // interrupted
+            }
+
+			if (this.queue.pollIn()) {
+				ZMsg msg = this.queue.recvMulti();
+				if (msg == null) {
+					break; // interrupted
+				} else if (msg.size() == 3) {
+					this.replyTo = msg.unwrap();
+					return msg.getFirst().getData();
+				} else if (msg.size() == 1) {
+					if (Arrays.equals(msg.getFirst().getData(),
+							PirateProtocol.HEARTBEAT)) {
+						retriesLeft = this.retries;
+					} else {
+                        LOGGER.warning("Invalid control message received from queue: "
+                                + msg.toString());
+					}
+				} else {
+                    LOGGER.warning("Message of unknown type received from queue: "
+                            + msg.toString());
+				}
+			} else if (--retriesLeft == 0) {
+                try {
+                    Thread.sleep(this.reconnectInterval);
+                } catch (InterruptedException e) {
+                    break; // interrupted
+                }
+				this.connect();
+				retriesLeft = this.retries;
+			}
+
+			this.heartbeatIfNeeded();
+		}
+
+        return null;
+	}
+
+    /**
+     * Finish and close communication to the queue.
+     */
+	public void close() {
+		this.queue.close();
+		this.context.term();
+	}
+
+    /**
+     * Connect to the queue and set heartbeatAt.
+     */
+	private void connect() {
+		this.queue = new PSocket(context, ZMQ.DEALER);
+		this.queue.connect(queueEndpoint);
+		this.queue.send(PirateProtocol.READY);
+		this.heartbeatAt = System.nanoTime() +
+                ((long)this.heartbeatInterval * 1000000);
+	}
+
+    /**
+     * Send a heartbeat to the queue if needed.
+     */
+	private void heartbeatIfNeeded() {
+		if (System.nanoTime() >= this.heartbeatAt) {
+			this.queue.send(PirateProtocol.HEARTBEAT);
+			this.heartbeatAt = System.nanoTime() +
+                    ((long)this.heartbeatInterval * 1000000);
+		}
+	}
+}

src/org/lyrcl/lib/pirate/Test.java

+package org.lyrcl.lib.pirate;
+
+class Test {
+	public static void queue() {
+		new PirateQueue("tcp://*:5555", "tcp://*:5556").run();
+	}
+
+	public static void client() {
+		System.out.println("Connecting to server…");
+		PirateClient client = new PirateClient("tcp://localhost:5555");
+
+		Integer sequence = 0;
+		while (true) {
+			// We send a request, then we work to get a reply
+            sequence++;
+			final byte[] request = sequence.toString().getBytes();
+			final byte[] reply = client.req(request);
+			if (reply == null) {
+				break;
+			}
+
+			final String replyString = new String(reply).trim();
+
+			int replySequence = -1;
+			try {
+				replySequence = Integer.parseInt(replyString);
+			} catch (Exception e) {
+				// Do nothing
+			}
+
+			if (replySequence == sequence) {
+				System.out.printf("I: server replied OK (%d)\n",
+						replySequence);
+			} else {
+				System.out.printf("E: malformed reply from server: (%s)\n",
+						replyString);
+			}
+		}
+
+		client.close();
+	}
+
+	public static void worker() {
+		PirateWorker worker = new PirateWorker("tcp://localhost:5556");
+		byte[] work = worker.recv(null);
+		while (work != null) {
+			Integer n = Integer.parseInt(new String(work));
+			work = worker.recv(n.toString().getBytes());
+		}
+	}
+
+	public static void main(String[] args) {
+		Test.client();
+	}
+}

src/org/lyrcl/lib/pirate/package-info.java

+/**
+ * Paranoid Pirate Protocol implementation for ZeroMQ 2.2.
+ */
+
+package org.lyrcl.lib.pirate;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.