Commits

pcmanus committed 90de8d5 Merge

Merge branch 'cassandra-1.2.0' of https://git-wip-us.apache.org/repos/asf/cassandra into cassandra-1.2.0

Comments (0)

Files changed (19)

  * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
  * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
  * Optimize mostRecentTomstone check in CC.collectAllData (CASSANDRA-4883)
+ * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
+ * Use Stats.db when bulk loading if present (CASSANDRA-4957)
 Merged from 1.1:
  * reset getRangeSlice filter after finishing a row for get_paged_slice
    (CASSANDRA-4919)

src/java/org/apache/cassandra/cql3/QueryProcessor.java

     throws RequestExecutionException, RequestValidationException
     {
         ClientState clientState = queryState.getClientState();
-        Tracing.trace("Checking access");
         statement.checkAccess(clientState);
-        Tracing.trace("Validating statement");
         statement.validate(clientState);
-        Tracing.trace("Executing statement");
         ResultMessage result = statement.execute(cl, queryState, variables);
         return result == null ? new ResultMessage.Void() : result;
     }

src/java/org/apache/cassandra/io/sstable/SSTableLoader.java

                 components.add(Component.PRIMARY_INDEX);
                 if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists())
                     components.add(Component.COMPRESSION_INFO);
+                if (new File(desc.filenameFor(Component.STATS)).exists())
+                    components.add(Component.STATS);
 
                 try
                 {

src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java

         else
         {
             Socket socket = SocketChannel.open(new InetSocketAddress(endPoint(), DatabaseDescriptor.getStoragePort())).socket();
-            if (Config.getOutboundBindAny())
+            if (Config.getOutboundBindAny() && !socket.isBound())
                 socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
             return socket;
         }

src/java/org/apache/cassandra/streaming/AbstractStreamSession.java

 package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.LoggerFactory;
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
 
+    protected final InetAddress host;
+    protected final UUID sessionId;
     protected String table;
-    protected Pair<InetAddress, Long> context;
     protected final IStreamCallback callback;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-    protected AbstractStreamSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+    protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
     {
+        this.host = host;
+        this.sessionId = sessionId;
         this.table = table;
-        this.context = context;
         this.callback = callback;
         Gossiper.instance.register(this);
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
-    public int getSourceFlag()
+    public UUID getSessionId()
     {
-        return (int)(context.right >> 32);
-    }
-
-    public long getSessionId()
-    {
-        return context.right;
+        return sessionId;
     }
 
     public InetAddress getHost()
     {
-        return context.left;
+        return host;
     }
 
     public void close(boolean success)

src/java/org/apache/cassandra/streaming/FileStreamTask.java

             // (at this point, if we fail, it is the receiver's job to re-request)
             stream();
 
-            StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+            StreamOutSession session = StreamOutSession.get(header.sessionId);
             if (session == null)
             {
                 logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
         }
         catch (IOException e)
         {
-            StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+            StreamOutSession session = StreamOutSession.get(header.sessionId);
             if (session != null)
                 session.close(false);
             throw e;

src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
         socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-        InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
-                           : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+        InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
         if (header.pendingFiles.isEmpty() && header.file != null)
         {
             // StreamInSession should be created already when receiving 2nd and after files
-            if (!StreamInSession.hasSession(host, header.sessionId))
+            if (!StreamInSession.hasSession(header.sessionId))
             {
                 StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
                 OutboundTcpConnection.write(reply.createMessage(),
-                                            Long.toString(header.sessionId),
+                                            header.sessionId.toString(),
                                             System.currentTimeMillis(),
                                             new DataOutputStream(socket.getOutputStream()),
                                             MessagingService.instance().getVersion(host));
         {
             underliningStream = null;
         }
-        metrics = StreamingMetrics.get(socket.getInetAddress());
+        metrics = StreamingMetrics.get(host);
     }
 
     /**

src/java/org/apache/cassandra/streaming/StreamHeader.java

  */
 package org.apache.cassandra.streaming;
 
-import java.io.*;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 public class StreamHeader
 {
     public static final IVersionedSerializer<StreamHeader> serializer = new StreamHeaderSerializer();
 
-    // Streaming sessionId flags, used to avoid duplicate session id's between nodes.
-    // See StreamInSession and StreamOutSession
-    public static final int STREAM_IN_SOURCE_FLAG = 0;
-    public static final int STREAM_OUT_SOURCE_FLAG = 1;
-
     public final String table;
 
     /** file being sent on initial stream */
     public final PendingFile file;
 
-    /** session is tuple of (host, sessionid) */
-    public final long sessionId;
+    /** session ID */
+    public final UUID sessionId;
 
     /** files to add to the session */
     public final Collection<PendingFile> pendingFiles;
 
-    /** Address of the sender **/
-    public final InetAddress broadcastAddress;
-
-    public StreamHeader(String table, long sessionId, PendingFile file)
+    public StreamHeader(String table, UUID sessionId, PendingFile file)
     {
         this(table, sessionId, file, Collections.<PendingFile>emptyList());
     }
 
-    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
-    {
-        this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress());
-    }
-
-    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles, InetAddress broadcastAddress)
+    public StreamHeader(String table, UUID sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
     {
         this.table = table;
         this.sessionId  = sessionId;
         this.file = first;
         this.pendingFiles = pendingFiles;
-        this.broadcastAddress = broadcastAddress;
     }
 
     private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
         public void serialize(StreamHeader sh, DataOutput dos, int version) throws IOException
         {
             dos.writeUTF(sh.table);
-            dos.writeLong(sh.sessionId);
+            UUIDSerializer.serializer.serialize(sh.sessionId, dos, MessagingService.current_version);
             PendingFile.serializer.serialize(sh.file, dos, version);
             dos.writeInt(sh.pendingFiles.size());
-            for(PendingFile file : sh.pendingFiles)
-            {
+            for (PendingFile file : sh.pendingFiles)
                 PendingFile.serializer.serialize(file, dos, version);
-            }
-            CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos);
         }
 
         public StreamHeader deserialize(DataInput dis, int version) throws IOException
         {
             String table = dis.readUTF();
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             PendingFile file = PendingFile.serializer.deserialize(dis, version);
             int size = dis.readInt();
 
             List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
             for (int i = 0; i < size; i++)
-            {
                 pendingFiles.add(PendingFile.serializer.deserialize(dis, version));
-            }
-            InetAddress bca = null;
-            if (version > MessagingService.VERSION_10)
-                bca = CompactEndpointSerializationHelper.deserialize(dis);
-            return new StreamHeader(table, sessionId, file, pendingFiles, bca);
+            return new StreamHeader(table, sessionId, file, pendingFiles);
         }
 
         public long serializedSize(StreamHeader sh, int version)
             size += TypeSizes.NATIVE.sizeof(sh.sessionId);
             size += PendingFile.serializer.serializedSize(sh.file, version);
             size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
-            for(PendingFile file : sh.pendingFiles)
+            for (PendingFile file : sh.pendingFiles)
                 size += PendingFile.serializer.serializedSize(file, version);
-            size += CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress);
             return size;
        }
     }

src/java/org/apache/cassandra/streaming/StreamInSession.java

 import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.MessagingService;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.*;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.OutboundTcpConnection;
-import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.UUIDGen;
 
 /** each context gets its own StreamInSession. So there may be >1 Session per host */
 public class StreamInSession extends AbstractStreamSession
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
 
-    private static final ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
+    private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID, StreamInSession>();
 
     private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
     private Socket socket;
     private volatile int retries;
-    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
-    /**
-     * The next session id is a combination of a local integer counter and a flag used to avoid collisions
-     * between session id's generated on different machines. Nodes can may have StreamOutSessions with the
-     * following contexts:
-     *
-     * <1.1.1.1, (stream_in_flag, 6)>
-     * <1.1.1.1, (stream_out_flag, 6)>
-     *
-     * The first is an out stream created in response to a request from node 1.1.1.1. The  id (6) was created by
-     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The  id (6) was
-     * created by this node.
-     *
-     * Note: The StreamInSession results in a StreamOutSession on the target that uses the StreamInSession sessionId.
-     *
-     * @return next StreamInSession sessionId
-     */
-    private static long nextSessionId()
-    {
-        return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
-    }
 
-    private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
+    private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback)
     {
-        super(null, context, callback);
+        super(null, host, sessionId, callback);
     }
 
     public static StreamInSession create(InetAddress host, IStreamCallback callback)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, nextSessionId());
-        StreamInSession session = new StreamInSession(context, callback);
-        sessions.put(context, session);
+        StreamInSession session = new StreamInSession(host, UUIDGen.makeType1UUIDFromHost(host), callback);
+        sessions.put(session.getSessionId(), session);
         return session;
     }
 
-    public static StreamInSession get(InetAddress host, long sessionId)
+    public static StreamInSession get(InetAddress host, UUID sessionId)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        StreamInSession session = sessions.get(context);
+        StreamInSession session = sessions.get(sessionId);
         if (session == null)
         {
-            StreamInSession possibleNew = new StreamInSession(context, null);
-            if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
+            StreamInSession possibleNew = new StreamInSession(host, sessionId, null);
+            if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null)
                 session = possibleNew;
         }
         return session;
     }
 
-    public static boolean hasSession(InetAddress host, long sessionId)
+    public static boolean hasSession(UUID sessionId)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        return sessions.get(context) != null;
+        return sessions.get(sessionId) != null;
     }
 
     public void setCurrentFile(PendingFile file)
         retries++;
         if (retries > DatabaseDescriptor.getMaxStreamingRetries())
         {
-            logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current),
+            logger.error(String.format("Failed streaming session %s from %s while receiving %s", getSessionId(), getHost().toString(), current),
                          new IllegalStateException("Too many retries for " + remoteFile));
             close(false);
             return;
             {
                 if (socket != null)
                     OutboundTcpConnection.write(reply.createMessage(),
-                                                context.right.toString(),
+                                                sessionId.toString(),
                                                 System.currentTimeMillis(),
                                                 new DataOutputStream(socket.getOutputStream()),
                                                 MessagingService.instance().getVersion(getHost()));
 
     protected void closeInternal(boolean success)
     {
-        sessions.remove(context);
+        sessions.remove(sessionId);
         if (!success && FailureDetector.instance.isAlive(getHost()))
         {
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
     public static Set<PendingFile> getIncomingFiles(InetAddress host)
     {
         Set<PendingFile> set = new HashSet<PendingFile>();
-        for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : sessions.entrySet())
+        for (Map.Entry<UUID, StreamInSession> entry : sessions.entrySet())
         {
-            if (entry.getKey().left.equals(host))
+            StreamInSession session = entry.getValue();
+            if (session.getHost().equals(host))
             {
-                StreamInSession session = entry.getValue();
                 if (session.current != null)
                     set.add(session.current);
                 set.addAll(session.files);

src/java/org/apache/cassandra/streaming/StreamOutSession.java

 
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
     private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
 
     // one host may have multiple stream sessions.
-    private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
-    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
-    /**
-     * The next session id is a combination of a local integer counter and a flag used to avoid collisions
-     * between session id's generated on different machines. Nodes can may have StreamOutSessions with the
-     * following contexts:
-     *
-     * <1.1.1.1, (stream_in_flag, 6)>
-     * <1.1.1.1, (stream_out_flag, 6)>
-     *
-     * The first is an out stream created in response to a request from node 1.1.1.1. The  id (6) was created by
-     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The  id (6) was
-     * created by this node.
-     * @return next StreamOutSession sessionId
-     */
-    private static long nextSessionId()
-    {
-        return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
-    }
+    private static final ConcurrentMap<UUID, StreamOutSession> streams = new NonBlockingHashMap<UUID, StreamOutSession>();
 
     public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
     {
-        return create(table, host, nextSessionId(), callback);
+        return create(table, host, UUIDGen.makeType1UUIDFromHost(host), callback);
     }
 
-    public static StreamOutSession create(String table, InetAddress host, long sessionId)
+    public static StreamOutSession create(String table, InetAddress host, UUID sessionId)
     {
         return create(table, host, sessionId, null);
     }
 
-    public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback)
+    public static StreamOutSession create(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        StreamOutSession session = new StreamOutSession(table, context, callback);
-        streams.put(context, session);
+        StreamOutSession session = new StreamOutSession(table, host, sessionId, callback);
+        streams.put(sessionId, session);
         return session;
     }
 
-    public static StreamOutSession get(InetAddress host, long sessionId)
+    public static StreamOutSession get(UUID sessionId)
     {
-        return streams.get(Pair.create(host, sessionId));
+        return streams.get(sessionId);
     }
 
     private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
 
     private volatile String currentFile;
 
-    private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+    private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
     {
-        super(table, context, callback);
+        super(table, host, sessionId, callback);
     }
 
     public void addFilesToStream(List<PendingFile> pendingFiles)
         // Release reference on last file (or any uncompleted ones)
         for (PendingFile file : files.values())
             file.sstable.releaseReference();
-        streams.remove(context);
+        streams.remove(sessionId);
     }
 
     /** convenience method for use when testing */
     void await() throws InterruptedException
     {
-        while (streams.containsKey(context))
+        while (streams.containsKey(sessionId))
             Thread.sleep(10);
     }
 
     public static List<PendingFile> getOutgoingFiles(InetAddress host)
     {
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (Map.Entry<Pair<InetAddress, Long>, StreamOutSession> entry : streams.entrySet())
+        for (Map.Entry<UUID, StreamOutSession> entry : streams.entrySet())
         {
-            if (entry.getKey().left.equals(host))
-                list.addAll(entry.getValue().getFiles());
+            StreamOutSession session = entry.getValue();
+            if (session.getHost().equals(host))
+                list.addAll(session.getFiles());
         }
         return list;
     }

src/java/org/apache/cassandra/streaming/StreamReply.java

 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 public class StreamReply
 {
 
     public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
 
-    public final long sessionId;
+    public final UUID sessionId;
     public final String file;
     public final Status action;
 
-    public StreamReply(String file, long sessionId, Status action)
+    public StreamReply(String file, UUID sessionId, Status action)
     {
         this.file = file;
         this.action = action;
     {
         public void serialize(StreamReply reply, DataOutput dos, int version) throws IOException
         {
-            dos.writeLong(reply.sessionId);
+            UUIDSerializer.serializer.serialize(reply.sessionId, dos, MessagingService.current_version);
             dos.writeUTF(reply.file);
             dos.writeInt(reply.action.ordinal());
         }
 
         public StreamReply deserialize(DataInput dis, int version) throws IOException
         {
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             String targetFile = dis.readUTF();
             Status action = Status.values()[dis.readInt()];
             return new StreamReply(targetFile, sessionId, action);

src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java

     {
         StreamReply reply = message.payload;
         logger.debug("Received StreamReply {}", reply);
-        StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
+        StreamOutSession session = StreamOutSession.get(reply.sessionId);
         if (session == null)
         {
             logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);

src/java/org/apache/cassandra/streaming/StreamRequest.java

 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
 * This class encapsulates the message that needs to be sent to nodes
 {
     public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
 
-    protected final long sessionId;
+    protected final UUID sessionId;
     protected final InetAddress target;
 
     // if this is specified, ranges and table should not be.
     protected final PendingFile file;
 
-    // if these are specified, file shoud not be.
+    // if these are specified, file should not be.
     protected final Collection<Range<Token>> ranges;
     protected final String table;
     protected final Iterable<ColumnFamilyStore> columnFamilies;
     protected final OperationType type;
 
-    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
+    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, UUID sessionId, OperationType type)
     {
         this.target = target;
         this.ranges = ranges;
         file = null;
     }
 
-    StreamRequest(InetAddress target, PendingFile file, long sessionId)
+    StreamRequest(InetAddress target, PendingFile file, UUID sessionId)
     {
         this.target = target;
         this.file = file;
             sb.append("@");
             sb.append(target);
             sb.append("------->");
-            for ( Range<Token> range : ranges )
+            for (Range<Token> range : ranges)
             {
                 sb.append(range);
                 sb.append(" ");
     {
         public void serialize(StreamRequest srm, DataOutput dos, int version) throws IOException
         {
-            dos.writeLong(srm.sessionId);
+            UUIDSerializer.serializer.serialize(srm.sessionId, dos, MessagingService.current_version);
             CompactEndpointSerializationHelper.serialize(srm.target, dos);
             if (srm.file != null)
             {
 
         public StreamRequest deserialize(DataInput dis, int version) throws IOException
         {
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
             boolean singleFile = dis.readBoolean();
             if (singleFile)
                 String table = dis.readUTF();
                 int size = dis.readInt();
                 List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
-                for( int i = 0; i < size; ++i )
+                for (int i = 0; i < size; ++i)
                     ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(dis, version).toTokenBounds());
-                OperationType type = OperationType.RESTORE_REPLICA_COUNT;
-                type = OperationType.valueOf(dis.readUTF());
+                OperationType type = OperationType.valueOf(dis.readUTF());
 
                 List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
                 int cfsSize = dis.readInt();

src/java/org/apache/cassandra/tools/NodeCmd.java

         GETENDPOINTS,
         GETSSTABLES,
         GOSSIPINFO,
+        HELP,
         INFO,
         INVALIDATEKEYCACHE,
         INVALIDATEROWCACHE,
 
             switch (command)
             {
+                case HELP : printUsage(); break;
                 case RING :
                     if (arguments.length > 0) { nodeCmd.printRing(System.out, arguments[0]); }
                     else                      { nodeCmd.printRing(System.out, null); };

test/data/serialization/1.2/streaming.StreamHeader.bin

Binary file modified.

test/data/serialization/1.2/streaming.StreamReply.bin

Binary file modified.

test/data/serialization/1.2/streaming.StreamRequestMessage.bin

Binary file modified.

test/unit/org/apache/cassandra/streaming/SerializationsTest.java

 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import org.junit.Test;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
 
     private void testStreamHeaderWrite() throws IOException
     {
-        StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100, OperationType.BOOTSTRAP));
-        StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100, OperationType.BOOTSTRAP));
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+        StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP));
+        StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, makePendingFile(false, 100, OperationType.BOOTSTRAP));
         Collection<PendingFile> files = new ArrayList<PendingFile>();
         for (int i = 0; i < 50; i++)
             files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP));
-        StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), files);
-        StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
-        StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
+        StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), files);
+        StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, files);
+        StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
 
         DataOutputStream out = getOutput("streaming.StreamHeader.bin");
         StreamHeader.serializer.serialize(sh0, out, getVersion());
 
     private void testStreamReplyWrite() throws IOException
     {
-        StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+        StreamReply rep = new StreamReply("this is a file", sessionId, StreamReply.Status.FILE_FINISHED);
         DataOutputStream out = getOutput("streaming.StreamReply.bin");
         StreamReply.serializer.serialize(rep, out, getVersion());
         rep.createMessage().serialize(out, getVersion());
 
     private void testStreamRequestMessageWrite() throws IOException
     {
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
         Collection<Range<Token>> ranges = new ArrayList<Range<Token>>();
         for (int i = 0; i < 5; i++)
             ranges.add(new Range<Token>(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
         List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
-        StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
-        StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
-        StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
+        StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, sessionId, OperationType.RESTORE_REPLICA_COUNT);
+        StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), sessionId);
+        StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), sessionId);
 
         DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
         StreamRequest.serializer.serialize(msg0, out, getVersion());

test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

         List<Range<Token>> ranges = new ArrayList<Range<Token>>();
         ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
-        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
         session.await();
     }
         // Acquiring references, transferSSTables needs it
         sstable.acquireReference();
         sstable2.acquireReference();
-        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, (IStreamCallback) null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
         session.await();
 
         if (!SSTableReader.acquireReferences(ssTableReaders))
             throw new AssertionError();
 
-        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, (IStreamCallback)null);
         StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);
 
         session.await();