Commits

Rhys ! committed ce6fff6 Draft

replaced all .java tabs with 4-spaces

Comments (0)

Files changed (6)

src/org/lyrcl/lib/pirate/PSocket.java

     /**
      * The charset to use for string (un)subscriptions.
      */
-	private static final Charset UTF8 = Charset.forName("UTF8");
+    private static final Charset UTF8 = Charset.forName("UTF8");
 
     /**
      * The underlying ZMQ socket.
      */
-	private final ZMQ.Socket socket;
+    private final ZMQ.Socket socket;
 
     /**
      * The ZMQ context of the underlying socket.
      * @param type The type of this socket.
      */
     public PSocket(ZMQ.Context context, int type) {
-    	this.context = context;
-    	this.socket = context.socket(type);
+        this.context = context;
+        this.socket = context.socket(type);
     }
 
     /**
         /**
          * The underlying ZMQ poller.
          */
-    	private final ZMQ.Poller poller;
+        private final ZMQ.Poller poller;
 
         /**
          * Initialise a Poller instance that wraps the specified ZMQ.Poller.
          *
          * @param poller The ZMQ.Poller to wrap.
          */
-    	private Poller(ZMQ.Poller poller) {
-    		this.poller = poller;
-    	}
+        private Poller(ZMQ.Poller poller) {
+            this.poller = poller;
+        }
 
         /**
          * Also listen for incoming messages on the specified PSocket.
          * @param socket The additional socket to listen on.
          * @return This (for chaining).
          */
-    	public Poller with(PSocket socket) {
-    		this.poller.register(socket.socket, ZMQ.Poller.POLLIN);
-    		return this;
-    	}
+        public Poller with(PSocket socket) {
+            this.poller.register(socket.socket, ZMQ.Poller.POLLIN);
+            return this;
+        }
 
         /**
          * Get the underlying ZMQ Poller.
          *
          * @return The underlying ZMQ Poller.
          */
-    	public ZMQ.Poller poller() {
-    		return this.poller;
-    	}
+        public ZMQ.Poller poller() {
+            return this.poller;
+        }
 
         /**
          * Poll for incoming messages with no timeout.
      * @return true if send was successful, false otherwise.
      */
     public Boolean send(byte[] bytes) {
-    	return this.send(bytes, -1);
+        return this.send(bytes, -1);
     }
 
     /**
      */
     public Boolean send(byte[] bytes, int timeout, boolean last) {
         int flags = last ? 0 : ZMQ.SNDMORE;
-    	this.setSendTimeoutIfNeeded(timeout);
+        this.setSendTimeoutIfNeeded(timeout);
         return this.socket.send(bytes, flags);
     }
 
      * @param timeout The send timeout, in milliseconds.
      */
     private void setSendTimeoutIfNeeded(int timeout) {
-    	if (this.sendTimeout != timeout) {
-    		this.socket.setSendTimeOut(timeout);
-    		this.sendTimeout = timeout;
-    	}
+        if (this.sendTimeout != timeout) {
+            this.socket.setSendTimeOut(timeout);
+            this.sendTimeout = timeout;
+        }
     }
 
     /**
      * @return The contents of a single-part message, or null if interrupted.
      */
     public byte[] recv() {
-    	return this.recv(-1);
+        return this.recv(-1);
     }
 
     /**
      *         or timed out.
      */
     public byte[] recv(int timeout) {
-    	this.setRecvTimeoutIfNeeded(timeout);
-    	return this.socket.recv(0);
+        this.setRecvTimeoutIfNeeded(timeout);
+        return this.socket.recv(0);
     }
 
     /**
      * @return The multi-part message, or null if interrupted.
      */
     public ZMsg recvMulti() {
-    	return this.recvMulti(-1);
+        return this.recvMulti(-1);
     }
 
     /**
      * @return The multi-part message, or null if interrupted or timed out.
      */
     public ZMsg recvMulti(int timeout) {
-    	this.setRecvTimeoutIfNeeded(timeout);
-    	return ZMsg.recvMsg(this.socket);
+        this.setRecvTimeoutIfNeeded(timeout);
+        return ZMsg.recvMsg(this.socket);
     }
 
     /**
      * @param timeout The receive timeout in milliseconds.
      */
     private void setRecvTimeoutIfNeeded(int timeout) {
-    	if (this.recvTimeout != timeout) {
-    		this.socket.setReceiveTimeOut(timeout);
-    		this.recvTimeout = timeout;
-    	}
+        if (this.recvTimeout != timeout) {
+            this.socket.setReceiveTimeOut(timeout);
+            this.recvTimeout = timeout;
+        }
     }
 
     /**
      * @return This (for chaining).
      */
     public PSocket sub(String prefix) {
-    	return this.sub(prefix.getBytes(PSocket.UTF8));
+        return this.sub(prefix.getBytes(PSocket.UTF8));
     }
 
     /**
      * @return This (for chaining).
      */
     public PSocket sub(byte[] prefix) {
-    	this.socket.subscribe(prefix);
-    	return this;
+        this.socket.subscribe(prefix);
+        return this;
     }
 
     /**
      * @return This (for chaining).
      */
     public PSocket unsub(byte[] prefix) {
-    	this.socket.unsubscribe(prefix);
-    	return this;
+        this.socket.unsubscribe(prefix);
+        return this;
     }
 
     /**
      * @return A poller that listens for incoming messages on this instance.
      */
     public PSocket.Poller poller() {
-    	return new Poller(this.context.poller()).with(this);
+        return new Poller(this.context.poller()).with(this);
     }
 
     /**
      *         the specified socket.
      */
     public PSocket.Poller with(PSocket socket) {
-    	return this.poller().with(socket);
+        return this.poller().with(socket);
     }
 
     /**

src/org/lyrcl/lib/pirate/PirateClient.java

     /**
      * The maximum time to wait for a reply to each attempt, in milliseconds.
      */
-	private final int timeout;
+    private final int timeout;
 
     /**
      * The number of times to try a request before giving up.
      */
-	private final int retries;
+    private final int retries;
 
     /**
      * The endpoint to communicate with the queue on.
      */
-	private final String queueEndpoint;
+    private final String queueEndpoint;
 
     /**
      * This client's ZMQ context.
     /**
      * The socket used to communicate with the queue.
      */
-	private PSocket socket;
+    private PSocket socket;
 
     /**
      * Initialise a new paranoid pirate client instance attached to the specified
      *
      * @param queueEndpoint The endpoint to communicate with the queue on.
      */
-	public PirateClient(String queueEndpoint) {
-		this(queueEndpoint, 2500, 3, 1);
-	}
+    public PirateClient(String queueEndpoint) {
+        this(queueEndpoint, 2500, 3, 1);
+    }
 
     /**
      * Initialise a new paranoid pirate client instance attached to the specified
      * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
      *                  around 1 thread for each gigabyte/second of data.
      */
-	public PirateClient(String queueEndpoint, int timeout, int retries, int ioThreads) {
-		this.timeout = timeout;
-		this.retries = retries;
-		this.queueEndpoint = queueEndpoint;
+    public PirateClient(String queueEndpoint, int timeout, int retries, int ioThreads) {
+        this.timeout = timeout;
+        this.retries = retries;
+        this.queueEndpoint = queueEndpoint;
 
-		this.context = ZMQ.context(ioThreads);
-	}
+        this.context = ZMQ.context(ioThreads);
+    }
 
     /**
      * Make a synchronous request.
      * @param request The request message.
      * @return The reply, or null for a timeout.
      */
-	public byte[] req(byte[] request) {
-		for (int i = 0; i < this.retries; i++) {
-			this.connectIfNeeded();
-			this.socket.send(request, 0);
+    public byte[] req(byte[] request) {
+        for (int i = 0; i < this.retries; i++) {
+            this.connectIfNeeded();
+            this.socket.send(request, 0);
             byte[] response = this.socket.recv(this.timeout);
-			if (response != null) {
-				return response;
-			} else {
-				this.socket.dropAndClose();
-				this.socket = null;
-			}
-		}
+            if (response != null) {
+                return response;
+            } else {
+                this.socket.dropAndClose();
+                this.socket = null;
+            }
+        }
 
-		return null;
-	}
+        return null;
+    }
 
     /**
      * Finish and close communication with the queue.
      */
-	public void close() {
-		if (this.socket != null) {
-			this.socket.close();
-		}
+    public void close() {
+        if (this.socket != null) {
+            this.socket.close();
+        }
 
-		this.context.term();
-	}
+        this.context.term();
+    }
 
     /**
      * Connect to the queue if not already.
      */
-	private void connectIfNeeded() {
-		if (this.socket == null) {
-			this.socket = new PSocket(this.context, ZMQ.REQ);
-			this.socket.connect(this.queueEndpoint);
-		}
-	}
+    private void connectIfNeeded() {
+        if (this.socket == null) {
+            this.socket = new PSocket(this.context, ZMQ.REQ);
+            this.socket.connect(this.queueEndpoint);
+        }
+    }
 }

src/org/lyrcl/lib/pirate/PirateProtocol.java

     /**
      * The bytes signifying a new worker is ready for work.
      */
-	public static byte[] READY = new byte[] { 1 };
+    public static byte[] READY = new byte[] { 1 };
 
     /**
      * The bytes signifying an existing component is still alive.
      */
-	public static byte[] HEARTBEAT = new byte[] { 2 };
+    public static byte[] HEARTBEAT = new byte[] { 2 };
 }

src/org/lyrcl/lib/pirate/PirateQueue.java

     /**
      * The number of heartbeats to wait before declaring a worker dead.
      */
-	private final int retries;
+    private final int retries;
 
     /**
      * The time between heartbeats to and from workers, in nanoseconds.
      */
-	private final int heartbeatInterval;
+    private final int heartbeatInterval;
 
     /**
      * The endpoint to communicate with clients on.
      */
-	private final String clientEndpoint;
+    private final String clientEndpoint;
 
     /**
      * The endpoint to communicate with workers on.
      */
-	private final String workerEndpoint;
+    private final String workerEndpoint;
 
     /**
      * This queue's ZMQ context.
      */
-	private final ZMQ.Context context;
+    private final ZMQ.Context context;
 
     /**
      * A remote worker with an address and expiry.
      */
-	private class Worker {
+    private class Worker {
         /**
          * The address of the worker (used in message wrapping).
          */
-		public byte[] address;
+        public byte[] address;
 
         /**
          * The time when the worker will be removed from the queue due to
          *
          * @see System.nanoTime()
          */
-		public long expiry;
+        public long expiry;
 
         /**
          * Initialise a Worker instance with the specified address and
          *
          * @param address The address of the worker.
          */
-		public Worker(byte[] address) {
-			this.address = address;
-			this.expiry = System.currentTimeMillis() +
+        public Worker(byte[] address) {
+            this.address = address;
+            this.expiry = System.currentTimeMillis() +
                     PirateQueue.this.retries * PirateQueue.this.heartbeatInterval;
-		}
+        }
 
-		@Override
-		public int hashCode() {
-			return Arrays.hashCode(this.address);
-		}
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(this.address);
+        }
 
-		@Override
-		public boolean equals(Object other) {
-			return other instanceof Worker &&
-				Arrays.equals(this.address,
-					((Worker) other).address);
-		}
-	}
+        @Override
+        public boolean equals(Object other) {
+            return other instanceof Worker &&
+                Arrays.equals(this.address,
+                    ((Worker) other).address);
+        }
+    }
 
     /**
      * A queue of live workers available for work.
      */
-	private class WorkerQueue {
+    private class WorkerQueue {
         /**
          * The available workers.
          */
-		private final Deque<Worker> queue = new ArrayDeque<Worker>();
+        private final Deque<Worker> queue = new ArrayDeque<Worker>();
 
         /**
          * The next time to send out heartbeats, in nanoseconds.
          *
          * @see System.nanoTime()
          */
-		private long heartbeatAt = System.nanoTime()
+        private long heartbeatAt = System.nanoTime()
                 + PirateQueue.this.heartbeatInterval;
 
         /**
          *
          * @param worker The worker.
          */
-		public void ready(Worker worker) {
-			this.queue.remove(worker);
-			this.queue.addLast(worker);
-		}
+        public void ready(Worker worker) {
+            this.queue.remove(worker);
+            this.queue.addLast(worker);
+        }
 
         /**
          * Pop a worker from the head of the queue and return its address.
          *
          * @return The address of a worker available for work.
          */
-		public byte[] next() {
-			return this.queue.pop().address;
-		}
+        public byte[] next() {
+            return this.queue.pop().address;
+        }
 
         /**
          * Check if the queue is empty.
          *
          * @return true if no workers are available for work, false otherwise.
          */
-		public boolean isEmpty() {
-			return this.queue.isEmpty();
-		}
+        public boolean isEmpty() {
+            return this.queue.isEmpty();
+        }
 
         /**
          * Send out heartbeats to available workers and remove expired workers.
          *
          * @param workerSock The socket to send heartbeats to workers from.
          */
-		public void maintain(PSocket workerSock) {
-			if (System.nanoTime() >= this.heartbeatAt) {
-				for (Worker w: this.queue) {
+        public void maintain(PSocket workerSock) {
+            if (System.nanoTime() >= this.heartbeatAt) {
+                for (Worker w: this.queue) {
                     workerSock.send(w.address, -1, false);
-					workerSock.send(PirateProtocol.HEARTBEAT, -1, true);
-				}
+                    workerSock.send(PirateProtocol.HEARTBEAT, -1, true);
+                }
 
-				this.heartbeatAt = System.nanoTime() +
-					PirateQueue.this.heartbeatInterval;
-			}
+                this.heartbeatAt = System.nanoTime() +
+                    PirateQueue.this.heartbeatInterval;
+            }
 
-			long time = System.nanoTime();
-			Iterator<Worker> i = this.queue.iterator();
-			while (i.hasNext() && i.next().expiry < time) {
-				i.remove();
-			}
-		}
-	}
+            long time = System.nanoTime();
+            Iterator<Worker> i = this.queue.iterator();
+            while (i.hasNext() && i.next().expiry < time) {
+                i.remove();
+            }
+        }
+    }
 
     /**
      * Initialise a new paranoid pirate queue instance attached to the specified
      * @param clientEndpoint The endpoint to communicate with clients on.
      * @param workerEndpoint The endpoint to communicate with workers on.
      */
-	public PirateQueue(String clientEndpoint, String workerEndpoint) {
-		this(clientEndpoint, workerEndpoint, 1000, 3, 1);
-	}
+    public PirateQueue(String clientEndpoint, String workerEndpoint) {
+        this(clientEndpoint, workerEndpoint, 1000, 3, 1);
+    }
 
     /**
      * Initialise a new paranoid pirate queue instance attached to the specified
      * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
      *                  around 1 thread for each gigabyte/second of data.
      */
-	public PirateQueue(String clientEndpoint, String workerEndpoint,
-			int heartbeatInterval, int retries, int ioThreads) {
-		this.heartbeatInterval = heartbeatInterval * 1000000;
-		this.retries = retries;
+    public PirateQueue(String clientEndpoint, String workerEndpoint,
+            int heartbeatInterval, int retries, int ioThreads) {
+        this.heartbeatInterval = heartbeatInterval * 1000000;
+        this.retries = retries;
 
-		this.context = ZMQ.context(ioThreads);
-		this.clientEndpoint = clientEndpoint;
-		this.workerEndpoint = workerEndpoint;
-	}
+        this.context = ZMQ.context(ioThreads);
+        this.clientEndpoint = clientEndpoint;
+        this.workerEndpoint = workerEndpoint;
+    }
 
     /**
      * Run as a paranoid pirate queue until interrupted.
      */
     @Override
-	public void run() {
-		PSocket clientSock = new PSocket(this.context, ZMQ.ROUTER);
-		PSocket workerSock = new PSocket(this.context, ZMQ.ROUTER);
-		clientSock.bind(this.clientEndpoint);
-		workerSock.bind(this.workerEndpoint);
+    public void run() {
+        PSocket clientSock = new PSocket(this.context, ZMQ.ROUTER);
+        PSocket workerSock = new PSocket(this.context, ZMQ.ROUTER);
+        clientSock.bind(this.clientEndpoint);
+        workerSock.bind(this.workerEndpoint);
 
-		PSocket.Poller pollWorkers = workerSock.poller();
+        PSocket.Poller pollWorkers = workerSock.poller();
         PSocket.Poller pollBoth = workerSock.with(clientSock);
-		WorkerQueue workers = new WorkerQueue();
+        WorkerQueue workers = new WorkerQueue();
 
-		while (!Thread.currentThread().isInterrupted()) {
+        while (!Thread.currentThread().isInterrupted()) {
             // No point polling for clients if there's no workers
             (workers.isEmpty() ? pollWorkers : pollBoth).poll();
 
-			if (workerSock.pollIn()) {
-				ZMsg msg = workerSock.recvMulti();
-				if (msg == null) {
-					break; // interrupted
-				}
+            if (workerSock.pollIn()) {
+                ZMsg msg = workerSock.recvMulti();
+                if (msg == null) {
+                    break; // interrupted
+                }
 
-				ZFrame address = msg.unwrap();
-				workers.ready(new Worker(address.getData()));
+                ZFrame address = msg.unwrap();
+                workers.ready(new Worker(address.getData()));
 
-				if (msg.size() == 1) { // control message
-					ZFrame frame = msg.getFirst();
-					if (!Arrays.equals(PirateProtocol.HEARTBEAT, frame.getData()) &&
-							!Arrays.equals(PirateProtocol.READY, frame.getData())) {
+                if (msg.size() == 1) { // control message
+                    ZFrame frame = msg.getFirst();
+                    if (!Arrays.equals(PirateProtocol.HEARTBEAT, frame.getData()) &&
+                            !Arrays.equals(PirateProtocol.READY, frame.getData())) {
                         LOGGER.warning(
                                 "Invalid control message received from worker: "
                                         + msg.toString());
-					}
-				} else {
-					msg.send(clientSock.socket());
-				}
-			}
+                    }
+                } else {
+                    msg.send(clientSock.socket());
+                }
+            }
 
-			if (clientSock.pollIn()) {
-				ZMsg msg = clientSock.recvMulti();
-				if (msg == null) {
-					break; // interrupted
-				} else {
-					msg.push(workers.next());
-					msg.send(workerSock.socket());
-				}
-			}
+            if (clientSock.pollIn()) {
+                ZMsg msg = clientSock.recvMulti();
+                if (msg == null) {
+                    break; // interrupted
+                } else {
+                    msg.push(workers.next());
+                    msg.send(workerSock.socket());
+                }
+            }
 
-			workers.maintain(workerSock);
-		}
+            workers.maintain(workerSock);
+        }
 
-		this.context.term();
-	}
+        this.context.term();
+    }
 }

src/org/lyrcl/lib/pirate/PirateWorker.java

     /**
      * This worker's ZMQ context.
      */
-	private final ZMQ.Context context;
+    private final ZMQ.Context context;
 
     /**
      * The endpoint to communicate with the queue on.
      */
-	private final String queueEndpoint;
+    private final String queueEndpoint;
 
     /**
      * The time between heartbeats to and from the queue, in milliseconds.
      */
-	private final int heartbeatInterval;
+    private final int heartbeatInterval;
 
     /**
      * The time to wait for the queue to restart once it stops responding to
      * heartbeats, in milliseconds.
      */
-	private final int reconnectInterval;
+    private final int reconnectInterval;
 
     /**
      * The number of heartbeats to wait before declaring the queue dead.
      */
-	private final int retries;
+    private final int retries;
 
     /**
      * The socket used to communicate with the queue.
      */
-	private PSocket queue;
+    private PSocket queue;
 
     /**
      * A frame containing the address of the client the last request
      * was received from.
      */
-	private ZFrame replyTo;
+    private ZFrame replyTo;
 
     /**
      * The next time to send out a heartbeat, in nanoseconds.
      *
      * @see System.nanoTime()
      */
-	private long heartbeatAt;
+    private long heartbeatAt;
 
     /**
      * Initialise a new paranoid pirate worker instance attached to the
      *
      * @param queueEndpoint The endpoint to communicate with the queue on.
      */
-	public PirateWorker(String queueEndpoint) {
-		this(queueEndpoint, 1000, 3, 5000, 1);
-	}
+    public PirateWorker(String queueEndpoint) {
+        this(queueEndpoint, 1000, 3, 5000, 1);
+    }
 
     /**
      * Initialise a new paranoid pirate queue instance attached to the specified
      * @param ioThreads The number of ZeroMQ I/O threads to use. Allocate
      *                  around 1 thread for each gigabyte/second of data.
      */
-	public PirateWorker(String queueEndpoint, int heartbeatInterval,
-			int retries, int reconnectInterval, int ioThreads) {
-		this.queueEndpoint = queueEndpoint;
-		this.heartbeatInterval = heartbeatInterval;
-		this.reconnectInterval = reconnectInterval;
-		this.retries = retries;
+    public PirateWorker(String queueEndpoint, int heartbeatInterval,
+            int retries, int reconnectInterval, int ioThreads) {
+        this.queueEndpoint = queueEndpoint;
+        this.heartbeatInterval = heartbeatInterval;
+        this.reconnectInterval = reconnectInterval;
+        this.retries = retries;
 
-		this.context = ZMQ.context(ioThreads);
-		this.connect();
-	}
+        this.context = ZMQ.context(ioThreads);
+        this.connect();
+    }
 
     /**
      * Respond to the last request with the specified message and wait for a
      * @param reply The reply message. Can only be null for the initial call.
      * @return The next request, or null if interrupted.
      */
-	public byte[] recv(byte[] reply) {
-		if (this.replyTo != null && reply == null) {
-			throw new IllegalArgumentException(
-					"reply cannot be null for non-initial recv call.");
-		}
+    public byte[] recv(byte[] reply) {
+        if (this.replyTo != null && reply == null) {
+            throw new IllegalArgumentException(
+                    "reply cannot be null for non-initial recv call.");
+        }
 
-		if (reply != null) {
+        if (reply != null) {
             ZMsg msg = new ZMsg();
             msg.add(new ZFrame(reply));
 
-			msg.wrap(this.replyTo);
-			msg.send(this.queue.socket(), true);
-			this.replyTo = null;
-		}
+            msg.wrap(this.replyTo);
+            msg.send(this.queue.socket(), true);
+            this.replyTo = null;
+        }
 
-		PSocket.Poller poller = this.queue.poller();
-		int retriesLeft = this.retries;
-		while (!Thread.currentThread().isInterrupted()) {
-			if (poller.poll(this.heartbeatInterval) == -1) {
+        PSocket.Poller poller = this.queue.poller();
+        int retriesLeft = this.retries;
+        while (!Thread.currentThread().isInterrupted()) {
+            if (poller.poll(this.heartbeatInterval) == -1) {
                 break; // interrupted
             }
 
-			if (this.queue.pollIn()) {
-				ZMsg msg = this.queue.recvMulti();
-				if (msg == null) {
-					break; // interrupted
-				} else if (msg.size() == 3) {
-					this.replyTo = msg.unwrap();
-					return msg.getFirst().getData();
-				} else if (msg.size() == 1) {
-					if (Arrays.equals(msg.getFirst().getData(),
-							PirateProtocol.HEARTBEAT)) {
-						retriesLeft = this.retries;
-					} else {
+            if (this.queue.pollIn()) {
+                ZMsg msg = this.queue.recvMulti();
+                if (msg == null) {
+                    break; // interrupted
+                } else if (msg.size() == 3) {
+                    this.replyTo = msg.unwrap();
+                    return msg.getFirst().getData();
+                } else if (msg.size() == 1) {
+                    if (Arrays.equals(msg.getFirst().getData(),
+                            PirateProtocol.HEARTBEAT)) {
+                        retriesLeft = this.retries;
+                    } else {
                         LOGGER.warning("Invalid control message received from queue: "
                                 + msg.toString());
-					}
-				} else {
+                    }
+                } else {
                     LOGGER.warning("Message of unknown type received from queue: "
                             + msg.toString());
-				}
-			} else if (--retriesLeft == 0) {
+                }
+            } else if (--retriesLeft == 0) {
                 try {
                     Thread.sleep(this.reconnectInterval);
                 } catch (InterruptedException e) {
                     break; // interrupted
                 }
-				this.connect();
-				retriesLeft = this.retries;
-			}
+                this.connect();
+                retriesLeft = this.retries;
+            }
 
-			this.heartbeatIfNeeded();
-		}
+            this.heartbeatIfNeeded();
+        }
 
         return null;
-	}
+    }
 
     /**
      * Finish and close communication to the queue.
      */
-	public void close() {
-		this.queue.close();
-		this.context.term();
-	}
+    public void close() {
+        this.queue.close();
+        this.context.term();
+    }
 
     /**
      * Connect to the queue and set heartbeatAt.
      */
-	private void connect() {
-		this.queue = new PSocket(context, ZMQ.DEALER);
-		this.queue.connect(queueEndpoint);
-		this.queue.send(PirateProtocol.READY);
-		this.heartbeatAt = System.nanoTime() +
+    private void connect() {
+        this.queue = new PSocket(context, ZMQ.DEALER);
+        this.queue.connect(queueEndpoint);
+        this.queue.send(PirateProtocol.READY);
+        this.heartbeatAt = System.nanoTime() +
                 ((long)this.heartbeatInterval * 1000000);
-	}
+    }
 
     /**
      * Send a heartbeat to the queue if needed.
      */
-	private void heartbeatIfNeeded() {
-		if (System.nanoTime() >= this.heartbeatAt) {
-			this.queue.send(PirateProtocol.HEARTBEAT);
-			this.heartbeatAt = System.nanoTime() +
+    private void heartbeatIfNeeded() {
+        if (System.nanoTime() >= this.heartbeatAt) {
+            this.queue.send(PirateProtocol.HEARTBEAT);
+            this.heartbeatAt = System.nanoTime() +
                     ((long)this.heartbeatInterval * 1000000);
-		}
-	}
+        }
+    }
 }

src/org/lyrcl/lib/pirate/Test.java

 package org.lyrcl.lib.pirate;
 
 class Test {
-	public static void queue() {
-		new PirateQueue("tcp://*:5555", "tcp://*:5556").run();
-	}
+    public static void queue() {
+        new PirateQueue("tcp://*:5555", "tcp://*:5556").run();
+    }
 
-	public static void client() {
-		System.out.println("Connecting to server…");
-		PirateClient client = new PirateClient("tcp://localhost:5555");
+    public static void client() {
+        System.out.println("Connecting to server…");
+        PirateClient client = new PirateClient("tcp://localhost:5555");
 
-		Integer sequence = 0;
-		while (true) {
-			// We send a request, then we work to get a reply
+        Integer sequence = 0;
+        while (true) {
+            // We send a request, then we work to get a reply
             sequence++;
-			final byte[] request = sequence.toString().getBytes();
-			final byte[] reply = client.req(request);
-			if (reply == null) {
-				break;
-			}
+            final byte[] request = sequence.toString().getBytes();
+            final byte[] reply = client.req(request);
+            if (reply == null) {
+                break;
+            }
 
-			final String replyString = new String(reply).trim();
+            final String replyString = new String(reply).trim();
 
-			int replySequence = -1;
-			try {
-				replySequence = Integer.parseInt(replyString);
-			} catch (Exception e) {
-				// Do nothing
-			}
+            int replySequence = -1;
+            try {
+                replySequence = Integer.parseInt(replyString);
+            } catch (Exception e) {
+                // Do nothing
+            }
 
-			if (replySequence == sequence) {
-				System.out.printf("I: server replied OK (%d)\n",
-						replySequence);
-			} else {
-				System.out.printf("E: malformed reply from server: (%s)\n",
-						replyString);
-			}
-		}
+            if (replySequence == sequence) {
+                System.out.printf("I: server replied OK (%d)\n",
+                        replySequence);
+            } else {
+                System.out.printf("E: malformed reply from server: (%s)\n",
+                        replyString);
+            }
+        }
 
-		client.close();
-	}
+        client.close();
+    }
 
-	public static void worker() {
-		PirateWorker worker = new PirateWorker("tcp://localhost:5556");
-		byte[] work = worker.recv(null);
-		while (work != null) {
-			Integer n = Integer.parseInt(new String(work));
-			work = worker.recv(n.toString().getBytes());
-		}
-	}
+    public static void worker() {
+        PirateWorker worker = new PirateWorker("tcp://localhost:5556");
+        byte[] work = worker.recv(null);
+        while (work != null) {
+            Integer n = Integer.parseInt(new String(work));
+            work = worker.recv(n.toString().getBytes());
+        }
+    }
 
-	public static void main(String[] args) {
-		Test.client();
-	}
+    public static void main(String[] args) {
+        Test.client();
+    }
 }