Commits

Anonymous committed 789cd06 Merge

merge from 1.2

Comments (0)

Files changed (4)

  * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 
 1.2.1
+ * stream undelivered hints on decommission (CASSANDRA-5128)
  * GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133)
  * drain should flush system CFs too (CASSANDRA-4446)
  * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)

src/java/org/apache/cassandra/locator/TokenMetadata.java

 
     public TokenMetadata()
     {
-        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology());
+        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
+             HashBiMap.<InetAddress, UUID>create(),
+             new Topology());
     }
 
-    public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
+    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
     {
         this.tokenToEndpointMap = tokenToEndpointMap;
         this.topology = topology;
-        endpointToHostIdMap = HashBiMap.create();
+        endpointToHostIdMap = endpointsMap;
         sortedTokens = sortTokens();
     }
 
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology));
+            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+                                     HashBiMap.create(endpointToHostIdMap),
+                                     new Topology(topology));
         }
         finally
         {
         }
     }
 
+    public Set<InetAddress> getAllEndpoints()
+    {
+        return endpointToHostIdMap.keySet();
+    }
+
     /** caller should not modify leavingEndpoints */
     public Set<InetAddress> getLeavingEndpoints()
     {

src/java/org/apache/cassandra/service/StorageService.java

         setMode(Mode.LEAVING, "streaming data to other nodes", true);
 
         CountDownLatch latch = streamRanges(rangesToStream);
+        CountDownLatch hintsLatch = streamHints();
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream aks.");
         try
         {
             latch.await();
+            hintsLatch.await();
         }
         catch (InterruptedException e)
         {
         onFinish.run();
     }
 
+    private CountDownLatch streamHints()
+    {
+        if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0)
+            return new CountDownLatch(0);
+
+        // gather all live nodes in the cluster that aren't also leaving
+        List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
+        candidates.remove(FBUtilities.getBroadcastAddress());
+        for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
+        {
+            InetAddress address = iter.next();
+            if (!FailureDetector.instance.isAlive(address))
+                iter.remove();
+        }
+
+        if (candidates.isEmpty())
+        {
+            logger.warn("Unable to stream hints since no live endpoints seen");
+            return new CountDownLatch(0);
+        }
+        else
+        {
+            // stream to the closest peer as chosen by the snitch
+            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
+            InetAddress hintsDestinationHost = candidates.get(0);
+
+            // stream all hints -- range list will be a singleton of "the entire ring"
+            Token token = StorageService.getPartitioner().getMinimumToken();
+            List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
+
+            CountDownLatch latch = new CountDownLatch(1);
+            StreamOut.transferRanges(hintsDestinationHost,
+                                     Table.open(Table.SYSTEM_KS),
+                                     Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)),
+                                     ranges,
+                                     new CountingDownStreamCallback(latch, hintsDestinationHost),
+                                     OperationType.UNBOOTSTRAP);
+            return latch;
+        }
+    }
+
     public void move(String newToken) throws IOException
     {
         try
                 final List<Range<Token>> ranges = rangesEntry.getValue();
                 final InetAddress newEndpoint = rangesEntry.getKey();
 
-                final IStreamCallback callback = new IStreamCallback()
-                {
-                    public void onSuccess()
-                    {
-                        latch.countDown();
-                    }
-
-                    public void onFailure()
-                    {
-                        logger.warn("Streaming to " + newEndpoint + " failed");
-                        onSuccess(); // calling onSuccess for latch countdown
-                    }
-                };
-
                 // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                StreamOut.transferRanges(newEndpoint, Table.open(table), ranges, callback, OperationType.UNBOOTSTRAP);
+                StreamOut.transferRanges(newEndpoint,
+                                         Table.open(table),
+                                         ranges,
+                                         new CountingDownStreamCallback(latch, newEndpoint),
+                                         OperationType.UNBOOTSTRAP);
             }
         }
         return latch;
     }
 
+    class CountingDownStreamCallback implements IStreamCallback
+    {
+        private final CountDownLatch latch;
+        private final InetAddress targetAddr;
+
+        CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr)
+        {
+            this.latch = latch;
+            this.targetAddr = targetAddr;
+        }
+
+        public void onSuccess()
+        {
+            latch.countDown();
+        }
+
+        public void onFailure()
+        {
+            logger.warn("Streaming to " + targetAddr + " failed");
+            onSuccess(); // calling onSuccess for latch countdown
+        }
+    };
+
     /**
      * Used to request ranges from endpoints in the ring (will block until all data is fetched and ready)
      * @param ranges ranges to fetch as map of the preferred address and range collection

src/java/org/apache/cassandra/streaming/StreamOut.java

     */
     public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
     {
+        transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type);
+    }
+
+    /**
+     * Stream the given ranges to the target endpoint for provided CFs in the given keyspace.
+     */
+    public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
+    {
         StreamOutSession session = StreamOutSession.create(table.getName(), target, callback);
-        transferRanges(session, table.getColumnFamilyStores(), ranges, type);
+        transferRanges(session, cfses, ranges, type);
     }
 
     /**