Commits

Anonymous committed 8231c63

fix requeue problem

Comments (0)

Files changed (2)

src/main/java/org/cdlib/mrt/queue/DistributedQueue.java

                 Item newItem = new Item(Item.PENDING, item.getData(), item.getTimestamp());
                 zookeeper.create(this.name, newItem.getBytes(),
                                  acl, CreateMode.PERSISTENT_SEQUENTIAL);
-                delete(item.getId());
+                updateStatus(item.getId(), item.getStatus(), Item.DELETED);
                 return true;
             } catch (KeeperException.NoNodeException e){
                 zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
                 zookeeper.setData(path, newItem.getBytes(), version);
                 return newItem;
             } else {
-                throw new RuntimeException("Bad id.");
+                throw new RuntimeException("Bad status.");
             }
         } catch (KeeperException.BadVersionException ex) {
             // Somebody got here first

src/test/java/org/cdlib/mrt/queueTest/QueueTest.java

 import org.cdlib.mrt.queue.*;
 
 import org.testng.annotations.*;
+import org.testng.Assert;
 
 public class QueueTest {
 
     ZooKeeper zk;
+    DistributedQueue q;
+    
+    final String Q_NAME = "/queueTest";
 
-    DistributedQueue q;
     @BeforeClass
     public void setUp() {
         try {
             this.zk = new ZooKeeper("localhost:2181", 10000, new DistributedQueue.Ignorer());
-            this.q = new DistributedQueue(zk, "/queueTest", null);
+            this.q = new DistributedQueue(zk, Q_NAME, null);
+            for (String child : this.zk.getChildren(Q_NAME, false)) {
+                this.zk.delete(String.format("%s/%s", Q_NAME, child), -1);
+            }
         } catch (IOException ex) {
             assert(false);
+        } catch (InterruptedException ex) {
+            assert(false);
+        } catch (KeeperException ex) {
+            assert(false);
         }
     }
     
             assert(q.submit("hello world".getBytes("UTF-8")));
             Item item = q.consume();
             q.complete(item.getId());
-
         } catch (IOException ex) {
             assert(false);
         } catch (InterruptedException ex) {
           dependsOnMethods = { "canCleanup"})
     public void queueShouldBeEmpty() {
         try {
-            List<String> children = this.zk.getChildren("/queueTest", false);
+            List<String> children = this.zk.getChildren(Q_NAME, false);
             assert(children.size() == 0);
         } catch (KeeperException ex) {
             assert(false);
             assert(false);
         }
     }
+
+    @Test(groups = { "functional" })
+    public void canRequeue() {
+        try {
+            byte[] content1 = "1".getBytes("UTF-8");
+            byte[] content2 = "2".getBytes("UTF-8");
+            Assert.assertTrue(q.submit(content1));
+            Assert.assertTrue(q.submit(content2));
+            Item item = q.consume();
+            Assert.assertEquals(item.getData(), content1);
+            q.requeue(item);
+            item = q.consume();
+            Assert.assertEquals(item.getData(), content2);
+            q.complete(item.getId());
+            item = q.consume();
+            Assert.assertEquals(item.getData(), content1);
+            q.complete(item.getId());
+        } catch (NoSuchElementException ex) {
+            assert(false);
+        } catch (KeeperException ex) {
+            assert(false);
+        } catch (InterruptedException ex) {
+            assert(false);
+        } catch (UnsupportedEncodingException ex) {
+            assert(false);
+        }
+    }
 }