Commits

Luke Hoersten committed 5a43109

Forgot to move unit tests to the discoproject.org namespace.

  • Participants
  • Parent commits 01be435

Comments (0)

Files changed (10)

test/com/allstontrading/fx/rd/disco/worker/DecodersTest.java

-package com.allstontrading.fx.rd.disco.worker;
-
-import static org.junit.Assert.*;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.discoproject.worker.protocol.decoder.FailDecoder;
-import org.discoproject.worker.protocol.decoder.HeaderDecoder;
-import org.discoproject.worker.protocol.decoder.InputDecoder;
-import org.discoproject.worker.protocol.decoder.RetryDecoder;
-import org.discoproject.worker.protocol.decoder.WaitDecoder;
-import org.discoproject.worker.protocol.decoder.types.DiscoInput;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputReplica;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputReplicaProtocol;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputStatus;
-import org.junit.Test;
-
-public class DecodersTest {
-
-	@Test
-	public void testFailDecoder() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		final FailDecoder decoder = new FailDecoder();
-
-		final String inputStr = "FAIL 16 [1234,[5,6,7,8]]\n";
-		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
-
-		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
-
-		assertEquals(1234, decoder.getInputId());
-		final List<Integer> replicaIds = decoder.getReplicaIds();
-		for (int i = 0; i < 4; i++) {
-			assertEquals((i + 5), replicaIds.get(i).intValue());
-		}
-	}
-
-	@Test
-	public void testRetryDecoder() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		final RetryDecoder decoder = new RetryDecoder();
-
-		final String inputStr = "RETRY 33 [[0,\"location0\"],[1,\"location1\"]]\n";
-		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
-
-		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
-		final List<DiscoInputReplica> replicas = decoder.getReplicas();
-	}
-
-	@Test
-	public void testWaitDecoder() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		final WaitDecoder decoder = new WaitDecoder();
-
-		final String inputStr = "WAIT 3 100\n";
-		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
-
-		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
-		assertEquals(100, decoder.getPauseSeconds());
-	}
-
-	@Test
-	public void testInputDecoder() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		final InputDecoder inputDecoder = new InputDecoder();
-
-		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
-		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
-
-		inputDecoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
-
-		assertTrue(inputDecoder.isDone());
-
-		final List<DiscoInput> inputs = inputDecoder.getInputs();
-		final DiscoInput input = inputs.get(0);
-		assertEquals(0, input.getId());
-		assertEquals(DiscoInputStatus.ok, input.getStatus());
-
-		final List<DiscoInputReplica> replicas = input.getReplicas();
-		final DiscoInputReplica replica = replicas.get(0);
-
-		assertEquals(0, replica.getId());
-		assertEquals("raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", replica.getURI().toString());
-		assertEquals(DiscoInputReplicaProtocol.raw, replica.getScheme());
-	}
-
-}

test/com/allstontrading/fx/rd/disco/worker/DiscoJobPackTest.java

-package com.allstontrading.fx.rd.disco.worker;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.discoproject.job.jobpack.DiscoJobPack;
-import org.discoproject.utils.JsonUtils;
-import org.junit.Test;
-
-public class DiscoJobPackTest {
-
-	@Test
-	public void testDiscoJobPack() throws URISyntaxException {
-
-		final DiscoJobPack jobPack = new DiscoJobPack("jobName", new File("jobPackFile.txt"), true, true, new File("jobHome"), new byte[] {
-		        1, 2, 3, 4 });
-
-		jobPack.addEnvVar("env1", "value1");
-		jobPack.addEnvVar("env2", "value2");
-
-		jobPack.addInput(new URI("raw://therawuriinput"));
-		jobPack.addInput(new URI("raw://therawuriinput2"));
-
-		final ByteBuffer buffer = ByteBuffer.allocate(1024);
-		jobPack.write(buffer);
-		buffer.flip();
-
-		buffer.getInt(); // magic!
-		final int jobDictOffset = buffer.getInt();
-		final int jobEnvsOffset = buffer.getInt();
-		final int jobHomeOffset = buffer.getInt();
-
-		buffer.position(128);
-		final String dictJson = new String(buffer.array(), jobDictOffset, jobEnvsOffset - jobDictOffset);
-		final String envsJson = new String(buffer.array(), jobEnvsOffset, jobHomeOffset - jobEnvsOffset);
-
-		final Map<String, String> dictObj = JsonUtils.asObject(dictJson);
-		assertEquals("jobPackFile.txt", JsonUtils.asString("worker", dictObj));
-		assertEquals("true", JsonUtils.asString("map?", dictObj));
-		assertEquals("true", JsonUtils.asString("reduce?", dictObj));
-		assertEquals("1", JsonUtils.asString("nr_reduces", dictObj));
-		assertEquals("jobName", JsonUtils.asString("prefix", dictObj));
-		assertEquals("max_cores", JsonUtils.asString("scheduler", dictObj));
-
-		final List<String> inputs = JsonUtils.asArray(dictObj.get("input"));
-		assertEquals(2, inputs.size());
-		assertEquals("raw://therawuriinput", JsonUtils.asString(0, inputs));
-		assertEquals("raw://therawuriinput2", JsonUtils.asString(1, inputs));
-		final Map<String, String> envObj = JsonUtils.asObject(envsJson);
-		assertEquals("value1", JsonUtils.asString("env1", envObj));
-		assertEquals("value2", JsonUtils.asString("env2", envObj));
-	}
-
-}

test/com/allstontrading/fx/rd/disco/worker/DiscoTest.java

-package com.allstontrading.fx.rd.disco.worker;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Pipe;
-import java.nio.channels.Pipe.SinkChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-import org.discoproject.DiscoUtils;
-import org.discoproject.worker.DiscoWorker;
-import org.discoproject.worker.protocol.DiscoIOChannel;
-import org.discoproject.worker.protocol.decoder.DiscoWorkerDecoder;
-import org.discoproject.worker.protocol.decoder.DiscoWorkerListener;
-import org.discoproject.worker.protocol.decoder.HeaderDecoder;
-import org.discoproject.worker.protocol.decoder.InputDecoder;
-import org.discoproject.worker.protocol.decoder.types.DiscoInput;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputReplica;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputReplicaProtocol;
-import org.discoproject.worker.protocol.decoder.types.DiscoInputStatus;
-import org.discoproject.worker.protocol.encoder.WorkerAnnounceEncoder;
-import org.junit.Test;
-import org.mockito.InOrder;
-
-/**
- * @author Luke Hoersten <lhoersten@allstontrading.com>
- * 
- */
-public class DiscoTest {
-
-	@Test
-	public void testWorkerAnnounceEncode() {
-		final String expected = "WORKER 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
-		assertEquals(expected, new WorkerAnnounceEncoder().set("1.0", 25094).toString());
-	}
-
-	@Test
-	public void testWorkerAnnounce() throws IOException {
-		final DiscoWorkerListener listener = mock(DiscoWorkerListener.class);
-		final InOrder inOrder = inOrder(listener);
-
-		final Pipe pipeFromDisco = Pipe.open();
-		final DiscoIOChannel ioChannel = new DiscoIOChannel(pipeFromDisco.source(), new NullByteChannel(),
-		        new DiscoWorkerDecoder().setListener(listener));
-
-		final String okStr = "OK 4 \"ok\"\n";
-		pipeFromDisco.sink().write(ByteBuffer.wrap(okStr.getBytes()));
-		ioChannel.write(new WorkerAnnounceEncoder().set("1.0", 555));
-
-		inOrder.verify(listener).ok();
-	}
-
-	@Test
-	public void testInputDecoder() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		final InputDecoder inputDecoder = new InputDecoder();
-
-		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
-		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
-
-		inputDecoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
-
-		assertTrue(inputDecoder.isDone());
-
-		final List<DiscoInput> inputs = inputDecoder.getInputs();
-		final DiscoInput input = inputs.get(0);
-		assertEquals(0, input.getId());
-		assertEquals(DiscoInputStatus.ok, input.getStatus());
-
-		final List<DiscoInputReplica> replicas = input.getReplicas();
-		final DiscoInputReplica replica = replicas.get(0);
-
-		assertEquals(0, replica.getId());
-		assertEquals("raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", replica.getURI().toString());
-		assertEquals(DiscoInputReplicaProtocol.raw, replica.getScheme());
-	}
-
-	@Test
-	public void testInputFlow() throws IOException {
-		final Pipe pipeFromDisco = Pipe.open();
-
-		final DiscoWorker discoWorker = new DiscoWorker(pipeFromDisco.source(), new NullByteChannel());
-		final SinkChannel disco = pipeFromDisco.sink();
-
-		// Put messages in the input pipeline (from the disco master)
-		final String okStr = "OK 4 \"ok\"\n";
-		disco.write(ByteBuffer.wrap(okStr.getBytes()));
-		final String taskStr = "TASK 327 {\"taskid\":0,\"master\":\"http://lhoersten-66113:8989\",\"disco_port\":8989,\"put_port\":8990,\"ddfs_data\":\"/srv/disco/ddfs\",\"disco_data\":\"/srv/disco/data\",\"mode\":\"map\",\"jobfile\":\"/srv/disco/data/localhost/5a/lhoersten-FunshineSimulator@524:7310e:1cb44/jobfile\",\"jobname\":\"lhoersten-FunshineSimulator@524:7310e:1cb44\",\"host\":\"localhost\"}\n";
-		disco.write(ByteBuffer.wrap(taskStr.getBytes()));
-		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
-		disco.write(ByteBuffer.wrap(inputStr.getBytes()));
-
-		discoWorker.requestTask();
-		final ReadableByteChannel mapInput = discoWorker.getMapInput();
-
-		final String input = DiscoUtils.channelLineToString(mapInput);
-
-		assertEquals("eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", input);
-	}
-
-	@Test
-	public void testChannelToString() throws IOException {
-		final Pipe pipe = Pipe.open();
-
-		final SinkChannel sink = pipe.sink();
-		final String expected = "12345678\n";
-		sink.write(ByteBuffer.wrap(expected.getBytes()));
-
-		final String actual = DiscoUtils.channelLineToString(pipe.source());
-
-		assertEquals(expected, actual);
-	}
-
-	@Test
-	public void testB64Encoding() {
-		final String json = "{\"entryexitlevel\":[\"DEC\",1.5],\"qr2\":[\"DEC\",1],\"qr1\":[\"DEC\",1.0E-5],\"enterexitwindow\":[\"STR\",\"'2 hours'\"],\"VarientNumber\":1}";
-		final String expected = "raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVDIiwxLjBFLTVdLCJlbnRlcmV4aXR3aW5kb3ciOlsiU1RSIiwiJzIgaG91cnMnIl0sIlZhcmllbnROdW1iZXIiOjF9";
-
-		final String actual = DiscoUtils.encodeRaw(json);
-		assertEquals(expected, actual);
-	}
-
-	@Test
-	public void testB64Decoding() throws IOException {
-		final String raw = "eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVDIiwxLjBFLTVdLCJlbnRlcmV4aXR3aW5kb3ciOlsiU1RSIiwiJzIgaG91cnMnIl0sIlZhcmllbnROdW1iZXIiOjF9";
-		final String expected = "{\"entryexitlevel\":[\"DEC\",1.5],\"qr2\":[\"DEC\",1],\"qr1\":[\"DEC\",1.0E-5],\"enterexitwindow\":[\"STR\",\"'2 hours'\"],\"VarientNumber\":1}";
-
-		final String actual = DiscoUtils.decodeRaw(raw);
-		assertEquals(expected, actual);
-	}
-
-	private class NullByteChannel implements WritableByteChannel {
-		@Override
-		public boolean isOpen() {
-			return true;
-		}
-
-		@Override
-		public void close() throws IOException {}
-
-		@Override
-		public int write(final ByteBuffer src) throws IOException {
-			final int remaining = src.remaining();
-			src.clear();
-			return remaining;
-		}
-	}
-
-}

test/com/allstontrading/fx/rd/disco/worker/EncodersTest.java

-package com.allstontrading.fx.rd.disco.worker;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.discoproject.worker.protocol.encoder.DoneEncoder;
-import org.discoproject.worker.protocol.encoder.ErrorEncoder;
-import org.discoproject.worker.protocol.encoder.FatalEncoder;
-import org.discoproject.worker.protocol.encoder.InputErrorEncoder;
-import org.discoproject.worker.protocol.encoder.MessageEncoder;
-import org.discoproject.worker.protocol.encoder.OutputEncoder;
-import org.discoproject.worker.protocol.encoder.PingEncoder;
-import org.discoproject.worker.protocol.encoder.RequestInputsEncoder;
-import org.discoproject.worker.protocol.encoder.RequestTaskEncoder;
-import org.discoproject.worker.protocol.encoder.WorkerAnnounceEncoder;
-import org.discoproject.worker.protocol.encoder.types.OutputType;
-import org.junit.Test;
-
-public class EncodersTest {
-
-	@Test
-	public void testDoneEncoder() {
-		final DoneEncoder encoder = new DoneEncoder();
-		final String encodedString = encoder.toString();
-
-		assertEquals("DONE 2 \"\"\n", encodedString);
-	}
-
-	@Test
-	public void testErrorEncoder() {
-		final ErrorEncoder encoder = new ErrorEncoder();
-		final String errorMessage = "The error message";
-		encoder.set(errorMessage);
-		final String encodedString = encoder.toString();
-
-		assertEquals("ERROR 19 \"The error message\"\n", encodedString);
-	}
-
-	@Test
-	public void testFatalEncoder() {
-		final FatalEncoder encoder = new FatalEncoder();
-		final String errorMessage = "The fatal message";
-		encoder.set(errorMessage);
-		final String encodedString = encoder.toString();
-
-		assertEquals("FATAL 19 \"The fatal message\"\n", encodedString);
-	}
-
-	@Test
-	public void testInputErrorEncoder() {
-		final InputErrorEncoder encoder = new InputErrorEncoder();
-		final String inputId = "inputId";
-		final List<String> repIds = new ArrayList<String>();
-		repIds.add("1");
-		repIds.add("2");
-		repIds.add("3");
-		repIds.add("4");
-		repIds.add("5");
-		encoder.set(inputId, repIds);
-		final String encodedString = encoder.toString();
-		assertEquals("INPUT_ERR 33 [\"inputId\",[\"1\",\"2\",\"3\",\"4\",\"5\"]]\n", encodedString);
-	}
-
-	@Test
-	public void testMessageEncoder() {
-		final MessageEncoder encoder = new MessageEncoder();
-		final String message = "The message";
-		encoder.set(message);
-		final String encodedString = encoder.toString();
-		assertEquals("MSG 13 \"The message\"\n", encodedString);
-	}
-
-	@Test
-	public void testOutputEncoder() {
-		final OutputEncoder encoder = new OutputEncoder();
-		final File jobHome = new File("dir/jobHome/");
-		final File outputLocation = new File("dir/outputLocation/");
-		final OutputType outputType = OutputType.disco;
-		final String label = "thelabel";
-		encoder.set(jobHome, outputLocation, outputType, label);
-		final String encodedString = encoder.toString();
-		assertEquals("OUTPUT 41 [\"dir/outputLocation\",\"disco\",\"thelabel\"]\n", encodedString);
-	}
-
-	@Test
-	public void testPingEncoder() {
-		final PingEncoder encoder = new PingEncoder();
-		final String encodedString = encoder.toString();
-
-		assertEquals("PING 2 \"\"\n", encodedString);
-	}
-
-	@Test
-	public void testRequestInputsEncoder() {
-		final RequestInputsEncoder encoder = new RequestInputsEncoder();
-		final String encodedString = encoder.toString();
-
-		assertEquals("INPUT 2 \"\"\n", encodedString);
-	}
-
-	@Test
-	public void testRequestTaskEncoder() {
-		final RequestTaskEncoder encoder = new RequestTaskEncoder();
-		final String encodedString = encoder.toString();
-
-		assertEquals("TASK 2 \"\"\n", encodedString);
-	}
-
-	@Test
-	public void testWorkerAnnounceEncoder() {
-		final WorkerAnnounceEncoder encoder = new WorkerAnnounceEncoder();
-		encoder.set("version", 12345);
-		final String encodedString = encoder.toString();
-		assertEquals("WORKER 33 {\"pid\":12345,\"version\":\"version\"}\n", encodedString);
-	}
-
-}

test/com/allstontrading/fx/rd/disco/worker/HeaderDecoderTest.java

-package com.allstontrading.fx.rd.disco.worker;
-
-import static org.junit.Assert.*;
-
-import java.nio.ByteBuffer;
-
-import org.discoproject.worker.protocol.decoder.HeaderDecoder;
-import org.discoproject.worker.protocol.decoder.types.ResponseMessageName;
-import org.junit.Test;
-
-
-/**
- * @author Luke Hoersten <lhoersten@allstontrading.com>
- * 
- */
-public class HeaderDecoderTest {
-
-	@Test
-	public void testPartialHeader() {
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-
-		final String msg = "WORKER 2";
-		assertFalse(headerDecoder.isFullHeader(ByteBuffer.wrap(msg.getBytes())));
-
-		final String msg1 = "WORK";
-		assertFalse(headerDecoder.isFullHeader(ByteBuffer.wrap(msg1.getBytes())));
-	}
-
-	@Test
-	public void testFullHeader() {
-		final String msg = "WORKER 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		assertTrue(headerDecoder.isFullHeader(ByteBuffer.wrap(msg.getBytes())));
-	}
-
-	@Test
-	public void testMessageName() {
-		final String msg = "INPUT 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		assertEquals(ResponseMessageName.INPUT, headerDecoder.parse(ByteBuffer.wrap(msg.getBytes())).getMessageName());
-	}
-
-	@Test
-	public void testPayloadLength() {
-		final String msg = "INPUT 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
-		final HeaderDecoder headerDecoder = new HeaderDecoder();
-		assertEquals(29, headerDecoder.parse(ByteBuffer.wrap(msg.getBytes())).getPayloadLength());
-	}
-
-}

test/org/discoproject/test/DecodersTest.java

+package org.discoproject.test;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.discoproject.worker.protocol.decoder.FailDecoder;
+import org.discoproject.worker.protocol.decoder.HeaderDecoder;
+import org.discoproject.worker.protocol.decoder.InputDecoder;
+import org.discoproject.worker.protocol.decoder.RetryDecoder;
+import org.discoproject.worker.protocol.decoder.WaitDecoder;
+import org.discoproject.worker.protocol.decoder.types.DiscoInput;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputReplica;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputReplicaProtocol;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputStatus;
+import org.junit.Test;
+
+public class DecodersTest {
+
+	@Test
+	public void testFailDecoder() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		final FailDecoder decoder = new FailDecoder();
+
+		final String inputStr = "FAIL 16 [1234,[5,6,7,8]]\n";
+		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
+
+		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
+
+		assertEquals(1234, decoder.getInputId());
+		final List<Integer> replicaIds = decoder.getReplicaIds();
+		for (int i = 0; i < 4; i++) {
+			assertEquals((i + 5), replicaIds.get(i).intValue());
+		}
+	}
+
+	@Test
+	public void testRetryDecoder() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		final RetryDecoder decoder = new RetryDecoder();
+
+		final String inputStr = "RETRY 33 [[0,\"location0\"],[1,\"location1\"]]\n";
+		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
+
+		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
+		final List<DiscoInputReplica> replicas = decoder.getReplicas();
+	}
+
+	@Test
+	public void testWaitDecoder() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		final WaitDecoder decoder = new WaitDecoder();
+
+		final String inputStr = "WAIT 3 100\n";
+		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
+
+		decoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
+		assertEquals(100, decoder.getPauseSeconds());
+	}
+
+	@Test
+	public void testInputDecoder() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		final InputDecoder inputDecoder = new InputDecoder();
+
+		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
+		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
+
+		inputDecoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
+
+		assertTrue(inputDecoder.isDone());
+
+		final List<DiscoInput> inputs = inputDecoder.getInputs();
+		final DiscoInput input = inputs.get(0);
+		assertEquals(0, input.getId());
+		assertEquals(DiscoInputStatus.ok, input.getStatus());
+
+		final List<DiscoInputReplica> replicas = input.getReplicas();
+		final DiscoInputReplica replica = replicas.get(0);
+
+		assertEquals(0, replica.getId());
+		assertEquals("raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", replica.getURI().toString());
+		assertEquals(DiscoInputReplicaProtocol.raw, replica.getScheme());
+	}
+
+}

test/org/discoproject/test/DiscoJobPackTest.java

+package org.discoproject.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.discoproject.job.jobpack.DiscoJobPack;
+import org.discoproject.utils.JsonUtils;
+import org.junit.Test;
+
+public class DiscoJobPackTest {
+
+	@Test
+	public void testDiscoJobPack() throws URISyntaxException {
+
+		final DiscoJobPack jobPack = new DiscoJobPack("jobName", new File("jobPackFile.txt"), true, true, new File("jobHome"), new byte[] {
+		        1, 2, 3, 4 });
+
+		jobPack.addEnvVar("env1", "value1");
+		jobPack.addEnvVar("env2", "value2");
+
+		jobPack.addInput(new URI("raw://therawuriinput"));
+		jobPack.addInput(new URI("raw://therawuriinput2"));
+
+		final ByteBuffer buffer = ByteBuffer.allocate(1024);
+		jobPack.write(buffer);
+		buffer.flip();
+
+		buffer.getInt(); // magic!
+		final int jobDictOffset = buffer.getInt();
+		final int jobEnvsOffset = buffer.getInt();
+		final int jobHomeOffset = buffer.getInt();
+
+		buffer.position(128);
+		final String dictJson = new String(buffer.array(), jobDictOffset, jobEnvsOffset - jobDictOffset);
+		final String envsJson = new String(buffer.array(), jobEnvsOffset, jobHomeOffset - jobEnvsOffset);
+
+		final Map<String, String> dictObj = JsonUtils.asObject(dictJson);
+		assertEquals("jobPackFile.txt", JsonUtils.asString("worker", dictObj));
+		assertEquals("true", JsonUtils.asString("map?", dictObj));
+		assertEquals("true", JsonUtils.asString("reduce?", dictObj));
+		assertEquals("1", JsonUtils.asString("nr_reduces", dictObj));
+		assertEquals("jobName", JsonUtils.asString("prefix", dictObj));
+		assertEquals("max_cores", JsonUtils.asString("scheduler", dictObj));
+
+		final List<String> inputs = JsonUtils.asArray(dictObj.get("input"));
+		assertEquals(2, inputs.size());
+		assertEquals("raw://therawuriinput", JsonUtils.asString(0, inputs));
+		assertEquals("raw://therawuriinput2", JsonUtils.asString(1, inputs));
+		final Map<String, String> envObj = JsonUtils.asObject(envsJson);
+		assertEquals("value1", JsonUtils.asString("env1", envObj));
+		assertEquals("value2", JsonUtils.asString("env2", envObj));
+	}
+
+}

test/org/discoproject/test/DiscoTest.java

+package org.discoproject.test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import org.discoproject.DiscoUtils;
+import org.discoproject.worker.DiscoWorker;
+import org.discoproject.worker.protocol.DiscoIOChannel;
+import org.discoproject.worker.protocol.decoder.DiscoWorkerDecoder;
+import org.discoproject.worker.protocol.decoder.DiscoWorkerListener;
+import org.discoproject.worker.protocol.decoder.HeaderDecoder;
+import org.discoproject.worker.protocol.decoder.InputDecoder;
+import org.discoproject.worker.protocol.decoder.types.DiscoInput;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputReplica;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputReplicaProtocol;
+import org.discoproject.worker.protocol.decoder.types.DiscoInputStatus;
+import org.discoproject.worker.protocol.encoder.WorkerAnnounceEncoder;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+/**
+ * @author Luke Hoersten <lhoersten@allstontrading.com>
+ * 
+ */
+public class DiscoTest {
+
+	@Test
+	public void testWorkerAnnounceEncode() {
+		final String expected = "WORKER 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
+		assertEquals(expected, new WorkerAnnounceEncoder().set("1.0", 25094).toString());
+	}
+
+	@Test
+	public void testWorkerAnnounce() throws IOException {
+		final DiscoWorkerListener listener = mock(DiscoWorkerListener.class);
+		final InOrder inOrder = inOrder(listener);
+
+		final Pipe pipeFromDisco = Pipe.open();
+		final DiscoIOChannel ioChannel = new DiscoIOChannel(pipeFromDisco.source(), new NullByteChannel(),
+		        new DiscoWorkerDecoder().setListener(listener));
+
+		final String okStr = "OK 4 \"ok\"\n";
+		pipeFromDisco.sink().write(ByteBuffer.wrap(okStr.getBytes()));
+		ioChannel.write(new WorkerAnnounceEncoder().set("1.0", 555));
+
+		inOrder.verify(listener).ok();
+	}
+
+	@Test
+	public void testInputDecoder() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		final InputDecoder inputDecoder = new InputDecoder();
+
+		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
+		final ByteBuffer bb = ByteBuffer.wrap(inputStr.getBytes());
+
+		inputDecoder.parse(bb, headerDecoder.parse(bb).getPayloadLength());
+
+		assertTrue(inputDecoder.isDone());
+
+		final List<DiscoInput> inputs = inputDecoder.getInputs();
+		final DiscoInput input = inputs.get(0);
+		assertEquals(0, input.getId());
+		assertEquals(DiscoInputStatus.ok, input.getStatus());
+
+		final List<DiscoInputReplica> replicas = input.getReplicas();
+		final DiscoInputReplica replica = replicas.get(0);
+
+		assertEquals(0, replica.getId());
+		assertEquals("raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", replica.getURI().toString());
+		assertEquals(DiscoInputReplicaProtocol.raw, replica.getScheme());
+	}
+
+	@Test
+	public void testInputFlow() throws IOException {
+		final Pipe pipeFromDisco = Pipe.open();
+
+		final DiscoWorker discoWorker = new DiscoWorker(pipeFromDisco.source(), new NullByteChannel());
+		final SinkChannel disco = pipeFromDisco.sink();
+
+		// Put messages in the input pipeline (from the disco master)
+		final String okStr = "OK 4 \"ok\"\n";
+		disco.write(ByteBuffer.wrap(okStr.getBytes()));
+		final String taskStr = "TASK 327 {\"taskid\":0,\"master\":\"http://lhoersten-66113:8989\",\"disco_port\":8989,\"put_port\":8990,\"ddfs_data\":\"/srv/disco/ddfs\",\"disco_data\":\"/srv/disco/data\",\"mode\":\"map\",\"jobfile\":\"/srv/disco/data/localhost/5a/lhoersten-FunshineSimulator@524:7310e:1cb44/jobfile\",\"jobname\":\"lhoersten-FunshineSimulator@524:7310e:1cb44\",\"host\":\"localhost\"}\n";
+		disco.write(ByteBuffer.wrap(taskStr.getBytes()));
+		final String inputStr = "INPUT 110 [\"done\",[[0,\"ok\",[[0,\"raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD\"]]]]]\n";
+		disco.write(ByteBuffer.wrap(inputStr.getBytes()));
+
+		discoWorker.requestTask();
+		final ReadableByteChannel mapInput = discoWorker.getMapInput();
+
+		final String input = DiscoUtils.channelLineToString(mapInput);
+
+		assertEquals("eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVD", input);
+	}
+
+	@Test
+	public void testChannelToString() throws IOException {
+		final Pipe pipe = Pipe.open();
+
+		final SinkChannel sink = pipe.sink();
+		final String expected = "12345678\n";
+		sink.write(ByteBuffer.wrap(expected.getBytes()));
+
+		final String actual = DiscoUtils.channelLineToString(pipe.source());
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testB64Encoding() {
+		final String json = "{\"entryexitlevel\":[\"DEC\",1.5],\"qr2\":[\"DEC\",1],\"qr1\":[\"DEC\",1.0E-5],\"enterexitwindow\":[\"STR\",\"'2 hours'\"],\"VarientNumber\":1}";
+		final String expected = "raw://eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVDIiwxLjBFLTVdLCJlbnRlcmV4aXR3aW5kb3ciOlsiU1RSIiwiJzIgaG91cnMnIl0sIlZhcmllbnROdW1iZXIiOjF9";
+
+		final String actual = DiscoUtils.encodeRaw(json);
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testB64Decoding() throws IOException {
+		final String raw = "eyJlbnRyeWV4aXRsZXZlbCI6WyJERUMiLDEuNV0sInFyMiI6WyJERUMiLDFdLCJxcjEiOlsiREVDIiwxLjBFLTVdLCJlbnRlcmV4aXR3aW5kb3ciOlsiU1RSIiwiJzIgaG91cnMnIl0sIlZhcmllbnROdW1iZXIiOjF9";
+		final String expected = "{\"entryexitlevel\":[\"DEC\",1.5],\"qr2\":[\"DEC\",1],\"qr1\":[\"DEC\",1.0E-5],\"enterexitwindow\":[\"STR\",\"'2 hours'\"],\"VarientNumber\":1}";
+
+		final String actual = DiscoUtils.decodeRaw(raw);
+		assertEquals(expected, actual);
+	}
+
+	private class NullByteChannel implements WritableByteChannel {
+		@Override
+		public boolean isOpen() {
+			return true;
+		}
+
+		@Override
+		public void close() throws IOException {}
+
+		@Override
+		public int write(final ByteBuffer src) throws IOException {
+			final int remaining = src.remaining();
+			src.clear();
+			return remaining;
+		}
+	}
+
+}

test/org/discoproject/test/EncodersTest.java

+package org.discoproject.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.discoproject.worker.protocol.encoder.DoneEncoder;
+import org.discoproject.worker.protocol.encoder.ErrorEncoder;
+import org.discoproject.worker.protocol.encoder.FatalEncoder;
+import org.discoproject.worker.protocol.encoder.InputErrorEncoder;
+import org.discoproject.worker.protocol.encoder.MessageEncoder;
+import org.discoproject.worker.protocol.encoder.OutputEncoder;
+import org.discoproject.worker.protocol.encoder.PingEncoder;
+import org.discoproject.worker.protocol.encoder.RequestInputsEncoder;
+import org.discoproject.worker.protocol.encoder.RequestTaskEncoder;
+import org.discoproject.worker.protocol.encoder.WorkerAnnounceEncoder;
+import org.discoproject.worker.protocol.encoder.types.OutputType;
+import org.junit.Test;
+
+public class EncodersTest {
+
+	@Test
+	public void testDoneEncoder() {
+		final DoneEncoder encoder = new DoneEncoder();
+		final String encodedString = encoder.toString();
+
+		assertEquals("DONE 2 \"\"\n", encodedString);
+	}
+
+	@Test
+	public void testErrorEncoder() {
+		final ErrorEncoder encoder = new ErrorEncoder();
+		final String errorMessage = "The error message";
+		encoder.set(errorMessage);
+		final String encodedString = encoder.toString();
+
+		assertEquals("ERROR 19 \"The error message\"\n", encodedString);
+	}
+
+	@Test
+	public void testFatalEncoder() {
+		final FatalEncoder encoder = new FatalEncoder();
+		final String errorMessage = "The fatal message";
+		encoder.set(errorMessage);
+		final String encodedString = encoder.toString();
+
+		assertEquals("FATAL 19 \"The fatal message\"\n", encodedString);
+	}
+
+	@Test
+	public void testInputErrorEncoder() {
+		final InputErrorEncoder encoder = new InputErrorEncoder();
+		final String inputId = "inputId";
+		final List<String> repIds = new ArrayList<String>();
+		repIds.add("1");
+		repIds.add("2");
+		repIds.add("3");
+		repIds.add("4");
+		repIds.add("5");
+		encoder.set(inputId, repIds);
+		final String encodedString = encoder.toString();
+		assertEquals("INPUT_ERR 33 [\"inputId\",[\"1\",\"2\",\"3\",\"4\",\"5\"]]\n", encodedString);
+	}
+
+	@Test
+	public void testMessageEncoder() {
+		final MessageEncoder encoder = new MessageEncoder();
+		final String message = "The message";
+		encoder.set(message);
+		final String encodedString = encoder.toString();
+		assertEquals("MSG 13 \"The message\"\n", encodedString);
+	}
+
+	@Test
+	public void testOutputEncoder() {
+		final OutputEncoder encoder = new OutputEncoder();
+		final File jobHome = new File("dir/jobHome/");
+		final File outputLocation = new File("dir/outputLocation/");
+		final OutputType outputType = OutputType.disco;
+		final String label = "thelabel";
+		encoder.set(jobHome, outputLocation, outputType, label);
+		final String encodedString = encoder.toString();
+		assertEquals("OUTPUT 41 [\"dir/outputLocation\",\"disco\",\"thelabel\"]\n", encodedString);
+	}
+
+	@Test
+	public void testPingEncoder() {
+		final PingEncoder encoder = new PingEncoder();
+		final String encodedString = encoder.toString();
+
+		assertEquals("PING 2 \"\"\n", encodedString);
+	}
+
+	@Test
+	public void testRequestInputsEncoder() {
+		final RequestInputsEncoder encoder = new RequestInputsEncoder();
+		final String encodedString = encoder.toString();
+
+		assertEquals("INPUT 2 \"\"\n", encodedString);
+	}
+
+	@Test
+	public void testRequestTaskEncoder() {
+		final RequestTaskEncoder encoder = new RequestTaskEncoder();
+		final String encodedString = encoder.toString();
+
+		assertEquals("TASK 2 \"\"\n", encodedString);
+	}
+
+	@Test
+	public void testWorkerAnnounceEncoder() {
+		final WorkerAnnounceEncoder encoder = new WorkerAnnounceEncoder();
+		encoder.set("version", 12345);
+		final String encodedString = encoder.toString();
+		assertEquals("WORKER 33 {\"pid\":12345,\"version\":\"version\"}\n", encodedString);
+	}
+
+}

test/org/discoproject/test/HeaderDecoderTest.java

+package org.discoproject.test;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.discoproject.worker.protocol.decoder.HeaderDecoder;
+import org.discoproject.worker.protocol.decoder.types.ResponseMessageName;
+import org.junit.Test;
+
+
+/**
+ * @author Luke Hoersten <lhoersten@allstontrading.com>
+ * 
+ */
+public class HeaderDecoderTest {
+
+	@Test
+	public void testPartialHeader() {
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+
+		final String msg = "WORKER 2";
+		assertFalse(headerDecoder.isFullHeader(ByteBuffer.wrap(msg.getBytes())));
+
+		final String msg1 = "WORK";
+		assertFalse(headerDecoder.isFullHeader(ByteBuffer.wrap(msg1.getBytes())));
+	}
+
+	@Test
+	public void testFullHeader() {
+		final String msg = "WORKER 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		assertTrue(headerDecoder.isFullHeader(ByteBuffer.wrap(msg.getBytes())));
+	}
+
+	@Test
+	public void testMessageName() {
+		final String msg = "INPUT 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		assertEquals(ResponseMessageName.INPUT, headerDecoder.parse(ByteBuffer.wrap(msg.getBytes())).getMessageName());
+	}
+
+	@Test
+	public void testPayloadLength() {
+		final String msg = "INPUT 29 {\"pid\":25094,\"version\":\"1.0\"}\n";
+		final HeaderDecoder headerDecoder = new HeaderDecoder();
+		assertEquals(29, headerDecoder.parse(ByteBuffer.wrap(msg.getBytes())).getPayloadLength());
+	}
+
+}