Commits

mark...@ucop.edu  committed 4ceb537

handle ZK connection loss when adding node to queue

  • Participants
  • Parent commits a8dc21b

Comments (0)

Files changed (1)

File src/main/java/org/cdlib/mrt/queue/DistributedQueue.java

 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+
 
 
 /**
         }
     }
         
-    public boolean submit(byte[] data) throws KeeperException, InterruptedException{
+    public boolean submit(byte[] data) throws KeeperException, InterruptedException, ConnectionLossException{
+	int attempts = 0;
+	String node = null;
         while (true) {
+            Item i = new Item(Item.PENDING, data,  new Date());
             try {
-                Item i = new Item(Item.PENDING, data,  new Date());
-                zookeeper.create(this.name, i.getBytes(), 
+                node = zookeeper.create(this.name, i.getBytes(), 
                                  acl, CreateMode.PERSISTENT_SEQUENTIAL);
                 return true;
+            } catch (ConnectionLossException cle) {
+		// did submit fail?
+		if (attempts >= 3) throw new ConnectionLossException();  
+		if (node == null) {  
+                    System.err.println("[error] DistributedQueue.submit lost connection, requeuing: " + cle.getMessage());
+		    attempts++;
+		} else {
+		    return true;
+		}
             } catch (KeeperException.NoNodeException e){
                 zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
             }