Commits

Jason Brown committed 262e006

Add retry mechanism to OTC for non-droppable_verbs
patch by jasobrown; reviewed by jbellis for CASSANDRA-5393

  • Participants
  • Parent commits 9fc071a

Comments (0)

Files changed (1)

src/java/org/apache/cassandra/net/OutboundTcpConnection.java

         expireMessages();
         try
         {
-            backlog.put(new QueuedMessage(message, id, System.currentTimeMillis()));
+            backlog.put(new QueuedMessage(message, id));
         }
         catch (InterruptedException e)
         {
         }
         catch (Exception e)
         {
-            // Non IO exceptions is likely a programming error so let's not silence it
-            if (!(e instanceof IOException))
-                logger.error("error writing to " + poolReference.endPoint(), e);
-            else if (logger.isDebugEnabled())
-                logger.debug("error writing to " + poolReference.endPoint(), e);
             disconnect();
+            if (e instanceof IOException)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error writing to " + poolReference.endPoint(), e);
+
+                // if the message was important, such as a repair acknowledgement, put it back on the queue
+                // to retry after re-connecting.  See CASSANDRA-5393
+                if (e instanceof SocketException && qm.shouldRetry())
+                {
+                    try
+                    {
+                        backlog.put(new RetriedQueuedMessage(qm));
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        throw new AssertionError(e1);
+                    }
+                }
+            }
+            else
+            {
+                // Non IO exceptions are likely a programming error so let's not silence them
+                logger.error("error writing to " + poolReference.endPoint(), e);
+            }
         }
     }
 
         }
     }
 
+    /** messages that have not been retried yet */
     private static class QueuedMessage
     {
         final MessageOut<?> message;
         final String id;
         final long timestamp;
 
-        QueuedMessage(MessageOut<?> message, String id, long timestamp)
+        QueuedMessage(MessageOut<?> message, String id)
         {
             this.message = message;
             this.id = id;
-            this.timestamp = timestamp;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        boolean shouldRetry()
+        {
+            return MessagingService.DROPPABLE_VERBS.contains(message.verb);
+        }
+    }
+
+    private static class RetriedQueuedMessage extends QueuedMessage
+    {
+        RetriedQueuedMessage(QueuedMessage msg)
+        {
+            super(msg.message, msg.id);
+        }
+
+        boolean shouldRetry()
+        {
+            return false;
         }
     }
 }