cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [4/6] git commit: change stream session ID from (host, counter) to TimeUUID; patch by yukim reviewed by Michael Kjellman for CASSANDRA-4813
Date Tue, 13 Nov 2012 22:20:10 GMT
change stream session ID from (host, counter) to TimeUUID; patch by yukim reviewed by Michael
Kjellman for CASSANDRA-4813


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/901a54a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/901a54a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/901a54a6

Branch: refs/heads/trunk
Commit: 901a54a6d6c7b6f7276088e67b45f95dcc7b57f7
Parents: 228d1cf
Author: Yuki Morishita <yukim@apache.org>
Authored: Tue Nov 13 16:18:07 2012 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Tue Nov 13 16:18:07 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/net/OutboundTcpConnectionPool.java   |    2 +-
 .../cassandra/streaming/AbstractStreamSession.java |   20 ++---
 .../apache/cassandra/streaming/FileStreamTask.java |    4 +-
 .../cassandra/streaming/IncomingStreamReader.java  |    9 +-
 .../apache/cassandra/streaming/StreamHeader.java   |   54 +++--------
 .../cassandra/streaming/StreamInSession.java       |   70 +++++----------
 .../cassandra/streaming/StreamOutSession.java      |   53 ++++-------
 .../apache/cassandra/streaming/StreamReply.java    |   10 ++-
 .../streaming/StreamReplyVerbHandler.java          |    2 +-
 .../apache/cassandra/streaming/StreamRequest.java  |   21 +++--
 .../serialization/1.2/streaming.StreamHeader.bin   |  Bin 175902 -> 175917 bytes
 .../serialization/1.2/streaming.StreamReply.bin    |  Bin 73 -> 89 bytes
 .../1.2/streaming.StreamRequestMessage.bin         |  Bin 7167 -> 7215 bytes
 .../cassandra/streaming/SerializationsTest.java    |   27 +++---
 .../cassandra/streaming/StreamingTransferTest.java |    6 +-
 16 files changed, 106 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 518d3ec..4f54598 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
  * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
  * Optimize mostRecentTomstone check in CC.collectAllData (CASSANDRA-4883)
+ * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
 Merged from 1.1:
  * reset getRangeSlice filter after finishing a row for get_paged_slice
    (CASSANDRA-4919)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 4d9ce63..237363d 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -124,7 +124,7 @@ public class OutboundTcpConnectionPool
         else
         {
             Socket socket = SocketChannel.open(new InetSocketAddress(endPoint(), DatabaseDescriptor.getStoragePort())).socket();
-            if (Config.getOutboundBindAny())
+            if (Config.getOutboundBindAny() && !socket.isBound())
                 socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
             return socket;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
index d190506..dd7d922 100644
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -32,33 +33,30 @@ public abstract class AbstractStreamSession implements IEndpointStateChangeSubsc
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
 
+    protected final InetAddress host;
+    protected final UUID sessionId;
     protected String table;
-    protected Pair<InetAddress, Long> context;
     protected final IStreamCallback callback;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-    protected AbstractStreamSession(String table, Pair<InetAddress, Long> context,
IStreamCallback callback)
+    protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback
callback)
     {
+        this.host = host;
+        this.sessionId = sessionId;
         this.table = table;
-        this.context = context;
         this.callback = callback;
         Gossiper.instance.register(this);
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
-    public int getSourceFlag()
+    public UUID getSessionId()
     {
-        return (int)(context.right >> 32);
-    }
-
-    public long getSessionId()
-    {
-        return context.right;
+        return sessionId;
     }
 
     public InetAddress getHost()
     {
-        return context.left;
+        return host;
     }
 
     public void close(boolean success)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index f4162de..67d5c35 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -90,7 +90,7 @@ public class FileStreamTask extends WrappedRunnable
             // (at this point, if we fail, it is the receiver's job to re-request)
             stream();
 
-            StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+            StreamOutSession session = StreamOutSession.get(header.sessionId);
             if (session == null)
             {
                 logger.info("Found no stream out session at end of file stream task - this
is expected if the receiver went down");
@@ -104,7 +104,7 @@ public class FileStreamTask extends WrappedRunnable
         }
         catch (IOException e)
         {
-            StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+            StreamOutSession session = StreamOutSession.get(header.sessionId);
             if (session != null)
                 session.close(false);
             throw e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index b4bea58..656a99d 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -60,16 +60,15 @@ public class IncomingStreamReader
     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
         socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-        InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
-                           : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+        InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
         if (header.pendingFiles.isEmpty() && header.file != null)
         {
             // StreamInSession should be created already when receiving 2nd and after files
-            if (!StreamInSession.hasSession(host, header.sessionId))
+            if (!StreamInSession.hasSession(header.sessionId))
             {
                 StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
                 OutboundTcpConnection.write(reply.createMessage(),
-                                            Long.toString(header.sessionId),
+                                            header.sessionId.toString(),
                                             System.currentTimeMillis(),
                                             new DataOutputStream(socket.getOutputStream()),
                                             MessagingService.instance().getVersion(host));
@@ -98,7 +97,7 @@ public class IncomingStreamReader
         {
             underliningStream = null;
         }
-        metrics = StreamingMetrics.get(socket.getInetAddress());
+        metrics = StreamingMetrics.get(host);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
index 00a9a43..7f3e654 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java
@@ -17,59 +17,42 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.*;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 public class StreamHeader
 {
     public static final IVersionedSerializer<StreamHeader> serializer = new StreamHeaderSerializer();
 
-    // Streaming sessionId flags, used to avoid duplicate session id's between nodes.
-    // See StreamInSession and StreamOutSession
-    public static final int STREAM_IN_SOURCE_FLAG = 0;
-    public static final int STREAM_OUT_SOURCE_FLAG = 1;
-
     public final String table;
 
     /** file being sent on initial stream */
     public final PendingFile file;
 
-    /** session is tuple of (host, sessionid) */
-    public final long sessionId;
+    /** session ID */
+    public final UUID sessionId;
 
     /** files to add to the session */
     public final Collection<PendingFile> pendingFiles;
 
-    /** Address of the sender **/
-    public final InetAddress broadcastAddress;
-
-    public StreamHeader(String table, long sessionId, PendingFile file)
+    public StreamHeader(String table, UUID sessionId, PendingFile file)
     {
         this(table, sessionId, file, Collections.<PendingFile>emptyList());
     }
 
-    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile>
pendingFiles)
-    {
-        this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress());
-    }
-
-    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile>
pendingFiles, InetAddress broadcastAddress)
+    public StreamHeader(String table, UUID sessionId, PendingFile first, Collection<PendingFile>
pendingFiles)
     {
         this.table = table;
         this.sessionId  = sessionId;
         this.file = first;
         this.pendingFiles = pendingFiles;
-        this.broadcastAddress = broadcastAddress;
     }
 
     private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
@@ -77,32 +60,24 @@ public class StreamHeader
         public void serialize(StreamHeader sh, DataOutput dos, int version) throws IOException
         {
             dos.writeUTF(sh.table);
-            dos.writeLong(sh.sessionId);
+            UUIDSerializer.serializer.serialize(sh.sessionId, dos, MessagingService.current_version);
             PendingFile.serializer.serialize(sh.file, dos, version);
             dos.writeInt(sh.pendingFiles.size());
-            for(PendingFile file : sh.pendingFiles)
-            {
+            for (PendingFile file : sh.pendingFiles)
                 PendingFile.serializer.serialize(file, dos, version);
-            }
-            CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos);
         }
 
         public StreamHeader deserialize(DataInput dis, int version) throws IOException
         {
             String table = dis.readUTF();
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             PendingFile file = PendingFile.serializer.deserialize(dis, version);
             int size = dis.readInt();
 
             List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
             for (int i = 0; i < size; i++)
-            {
                 pendingFiles.add(PendingFile.serializer.deserialize(dis, version));
-            }
-            InetAddress bca = null;
-            if (version > MessagingService.VERSION_10)
-                bca = CompactEndpointSerializationHelper.deserialize(dis);
-            return new StreamHeader(table, sessionId, file, pendingFiles, bca);
+            return new StreamHeader(table, sessionId, file, pendingFiles);
         }
 
         public long serializedSize(StreamHeader sh, int version)
@@ -111,9 +86,8 @@ public class StreamHeader
             size += TypeSizes.NATIVE.sizeof(sh.sessionId);
             size += PendingFile.serializer.serializedSize(sh.file, version);
             size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
-            for(PendingFile file : sh.pendingFiles)
+            for (PendingFile file : sh.pendingFiles)
                 size += PendingFile.serializer.serializedSize(file, version);
-            size += CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress);
             return size;
        }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index f8bde91..a61ceb7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -23,88 +23,62 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.MessagingService;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.*;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.OutboundTcpConnection;
-import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.UUIDGen;
 
 /** each context gets its own StreamInSession. So there may be >1 Session per host */
 public class StreamInSession extends AbstractStreamSession
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
 
-    private static final ConcurrentMap<Pair<InetAddress, Long>, StreamInSession>
sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
+    private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID,
StreamInSession>();
 
     private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
     private Socket socket;
     private volatile int retries;
-    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
-    /**
-     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
-     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
-     * following contexts:
-     *
-     * <1.1.1.1, (stream_in_flag, 6)>
-     * <1.1.1.1, (stream_out_flag, 6)>
-     *
-     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
-     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
-     * created by this node.
-     *
-     * Note: The StreamInSession results in a StreamOutSession on the target that uses the
StreamInSession sessionId.
-     *
-     * @return next StreamInSession sessionId
-     */
-    private static long nextSessionId()
-    {
-        return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
-    }
 
-    private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
+    private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback)
     {
-        super(null, context, callback);
+        super(null, host, sessionId, callback);
     }
 
     public static StreamInSession create(InetAddress host, IStreamCallback callback)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, nextSessionId());
-        StreamInSession session = new StreamInSession(context, callback);
-        sessions.put(context, session);
+        StreamInSession session = new StreamInSession(host, UUIDGen.makeType1UUIDFromHost(host),
callback);
+        sessions.put(session.getSessionId(), session);
         return session;
     }
 
-    public static StreamInSession get(InetAddress host, long sessionId)
+    public static StreamInSession get(InetAddress host, UUID sessionId)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        StreamInSession session = sessions.get(context);
+        StreamInSession session = sessions.get(sessionId);
         if (session == null)
         {
-            StreamInSession possibleNew = new StreamInSession(context, null);
-            if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
+            StreamInSession possibleNew = new StreamInSession(host, sessionId, null);
+            if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null)
                 session = possibleNew;
         }
         return session;
     }
 
-    public static boolean hasSession(InetAddress host, long sessionId)
+    public static boolean hasSession(UUID sessionId)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        return sessions.get(context) != null;
+        return sessions.get(sessionId) != null;
     }
 
     public void setCurrentFile(PendingFile file)
@@ -227,7 +201,7 @@ public class StreamInSession extends AbstractStreamSession
             {
                 if (socket != null)
                     OutboundTcpConnection.write(reply.createMessage(),
-                                                context.right.toString(),
+                                                sessionId.toString(),
                                                 System.currentTimeMillis(),
                                                 new DataOutputStream(socket.getOutputStream()),
                                                 MessagingService.instance().getVersion(getHost()));
@@ -246,7 +220,7 @@ public class StreamInSession extends AbstractStreamSession
 
     protected void closeInternal(boolean success)
     {
-        sessions.remove(context);
+        sessions.remove(sessionId);
         if (!success && FailureDetector.instance.isAlive(getHost()))
         {
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
@@ -269,11 +243,11 @@ public class StreamInSession extends AbstractStreamSession
     public static Set<PendingFile> getIncomingFiles(InetAddress host)
     {
         Set<PendingFile> set = new HashSet<PendingFile>();
-        for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : sessions.entrySet())
+        for (Map.Entry<UUID, StreamInSession> entry : sessions.entrySet())
         {
-            if (entry.getKey().left.equals(host))
+            StreamInSession session = entry.getValue();
+            if (session.getHost().equals(host))
             {
-                StreamInSession session = entry.getValue();
                 if (session.current != null)
                     set.add(session.current);
                 set.addAll(session.files);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index e1f42dc..ad312ff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -38,57 +40,37 @@ public class StreamOutSession extends AbstractStreamSession
     private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
 
     // one host may have multiple stream sessions.
-    private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession>
streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
-    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
-    /**
-     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
-     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
-     * following contexts:
-     *
-     * <1.1.1.1, (stream_in_flag, 6)>
-     * <1.1.1.1, (stream_out_flag, 6)>
-     *
-     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
-     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
-     * created by this node.
-     * @return next StreamOutSession sessionId
-     */
-    private static long nextSessionId()
-    {
-        return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
-    }
+    private static final ConcurrentMap<UUID, StreamOutSession> streams = new NonBlockingHashMap<UUID,
StreamOutSession>();
 
     public static StreamOutSession create(String table, InetAddress host, IStreamCallback
callback)
     {
-        return create(table, host, nextSessionId(), callback);
+        return create(table, host, UUIDGen.makeType1UUIDFromHost(host), callback);
     }
 
-    public static StreamOutSession create(String table, InetAddress host, long sessionId)
+    public static StreamOutSession create(String table, InetAddress host, UUID sessionId)
     {
         return create(table, host, sessionId, null);
     }
 
-    public static StreamOutSession create(String table, InetAddress host, long sessionId,
IStreamCallback callback)
+    public static StreamOutSession create(String table, InetAddress host, UUID sessionId,
IStreamCallback callback)
     {
-        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
-        StreamOutSession session = new StreamOutSession(table, context, callback);
-        streams.put(context, session);
+        StreamOutSession session = new StreamOutSession(table, host, sessionId, callback);
+        streams.put(sessionId, session);
         return session;
     }
 
-    public static StreamOutSession get(InetAddress host, long sessionId)
+    public static StreamOutSession get(UUID sessionId)
     {
-        return streams.get(Pair.create(host, sessionId));
+        return streams.get(sessionId);
     }
 
     private final Map<String, PendingFile> files = new NonBlockingHashMap<String,
PendingFile>();
 
     private volatile String currentFile;
 
-    private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback
callback)
+    private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback
callback)
     {
-        super(table, context, callback);
+        super(table, host, sessionId, callback);
     }
 
     public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -129,13 +111,13 @@ public class StreamOutSession extends AbstractStreamSession
         // Release reference on last file (or any uncompleted ones)
         for (PendingFile file : files.values())
             file.sstable.releaseReference();
-        streams.remove(context);
+        streams.remove(sessionId);
     }
 
     /** convenience method for use when testing */
     void await() throws InterruptedException
     {
-        while (streams.containsKey(context))
+        while (streams.containsKey(sessionId))
             Thread.sleep(10);
     }
 
@@ -157,10 +139,11 @@ public class StreamOutSession extends AbstractStreamSession
     public static List<PendingFile> getOutgoingFiles(InetAddress host)
     {
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (Map.Entry<Pair<InetAddress, Long>, StreamOutSession> entry : streams.entrySet())
+        for (Map.Entry<UUID, StreamOutSession> entry : streams.entrySet())
         {
-            if (entry.getKey().left.equals(host))
-                list.addAll(entry.getValue().getFiles());
+            StreamOutSession session = entry.getValue();
+            if (session.getHost().equals(host))
+                list.addAll(session.getFiles());
         }
         return list;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index bfb65e3..eee8e37 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.streaming;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 public class StreamReply
 {
@@ -38,11 +40,11 @@ public class StreamReply
 
     public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
 
-    public final long sessionId;
+    public final UUID sessionId;
     public final String file;
     public final Status action;
 
-    public StreamReply(String file, long sessionId, Status action)
+    public StreamReply(String file, UUID sessionId, Status action)
     {
         this.file = file;
         this.action = action;
@@ -68,14 +70,14 @@ public class StreamReply
     {
         public void serialize(StreamReply reply, DataOutput dos, int version) throws IOException
         {
-            dos.writeLong(reply.sessionId);
+            UUIDSerializer.serializer.serialize(reply.sessionId, dos, MessagingService.current_version);
             dos.writeUTF(reply.file);
             dos.writeInt(reply.action.ordinal());
         }
 
         public StreamReply deserialize(DataInput dis, int version) throws IOException
         {
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             String targetFile = dis.readUTF();
             Status action = Status.values()[dis.readInt()];
             return new StreamReply(targetFile, sessionId, action);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 714f76a..ebcee8a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -31,7 +31,7 @@ public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
     {
         StreamReply reply = message.payload;
         logger.debug("Received StreamReply {}", reply);
-        StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
+        StreamOutSession session = StreamOutSession.get(reply.sessionId);
         if (session == null)
         {
             logger.debug("Received stream action " + reply.action + " for an unknown session
from " + message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index a8de4a6..b49fb36 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 import com.google.common.collect.Iterables;
 
@@ -38,6 +39,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
 * This class encapsulates the message that needs to be sent to nodes
@@ -50,19 +52,19 @@ public class StreamRequest
 {
     public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
 
-    protected final long sessionId;
+    protected final UUID sessionId;
     protected final InetAddress target;
 
     // if this is specified, ranges and table should not be.
     protected final PendingFile file;
 
-    // if these are specified, file shoud not be.
+    // if these are specified, file should not be.
     protected final Collection<Range<Token>> ranges;
     protected final String table;
     protected final Iterable<ColumnFamilyStore> columnFamilies;
     protected final OperationType type;
 
-    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String
table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
+    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String
table, Iterable<ColumnFamilyStore> columnFamilies, UUID sessionId, OperationType type)
     {
         this.target = target;
         this.ranges = ranges;
@@ -73,7 +75,7 @@ public class StreamRequest
         file = null;
     }
 
-    StreamRequest(InetAddress target, PendingFile file, long sessionId)
+    StreamRequest(InetAddress target, PendingFile file, UUID sessionId)
     {
         this.target = target;
         this.file = file;
@@ -100,7 +102,7 @@ public class StreamRequest
             sb.append("@");
             sb.append(target);
             sb.append("------->");
-            for ( Range<Token> range : ranges )
+            for (Range<Token> range : ranges)
             {
                 sb.append(range);
                 sb.append(" ");
@@ -118,7 +120,7 @@ public class StreamRequest
     {
         public void serialize(StreamRequest srm, DataOutput dos, int version) throws IOException
         {
-            dos.writeLong(srm.sessionId);
+            UUIDSerializer.serializer.serialize(srm.sessionId, dos, MessagingService.current_version);
             CompactEndpointSerializationHelper.serialize(srm.target, dos);
             if (srm.file != null)
             {
@@ -143,7 +145,7 @@ public class StreamRequest
 
         public StreamRequest deserialize(DataInput dis, int version) throws IOException
         {
-            long sessionId = dis.readLong();
+            UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
             InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
             boolean singleFile = dis.readBoolean();
             if (singleFile)
@@ -156,10 +158,9 @@ public class StreamRequest
                 String table = dis.readUTF();
                 int size = dis.readInt();
                 List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
-                for( int i = 0; i < size; ++i )
+                for (int i = 0; i < size; ++i)
                     ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(dis,
version).toTokenBounds());
-                OperationType type = OperationType.RESTORE_REPLICA_COUNT;
-                type = OperationType.valueOf(dis.readUTF());
+                OperationType type = OperationType.valueOf(dis.readUTF());
 
                 List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
                 int cfsSize = dis.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamHeader.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamHeader.bin b/test/data/serialization/1.2/streaming.StreamHeader.bin
index a9f1d39..ac5b7ac 100644
Binary files a/test/data/serialization/1.2/streaming.StreamHeader.bin and b/test/data/serialization/1.2/streaming.StreamHeader.bin
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamReply.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamReply.bin b/test/data/serialization/1.2/streaming.StreamReply.bin
index 4b74058..6933316 100644
Binary files a/test/data/serialization/1.2/streaming.StreamReply.bin and b/test/data/serialization/1.2/streaming.StreamReply.bin
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamRequestMessage.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin
index 75af388..fd53579 100644
Binary files a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin and b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index 95a7d8b..47990ab 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -22,10 +22,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import org.junit.Test;
 
@@ -44,6 +41,7 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
@@ -84,14 +82,15 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testStreamHeaderWrite() throws IOException
     {
-        StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100,
OperationType.BOOTSTRAP));
-        StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100,
OperationType.BOOTSTRAP));
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+        StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true,
100, OperationType.BOOTSTRAP));
+        StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, makePendingFile(false,
100, OperationType.BOOTSTRAP));
         Collection<PendingFile> files = new ArrayList<PendingFile>();
         for (int i = 0; i < 50; i++)
             files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP));
-        StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100,
OperationType.BOOTSTRAP), files);
-        StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
-        StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100,
OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
+        StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true,
100, OperationType.BOOTSTRAP), files);
+        StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, files);
+        StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true,
100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
 
         DataOutputStream out = getOutput("streaming.StreamHeader.bin");
         StreamHeader.serializer.serialize(sh0, out, getVersion());
@@ -126,7 +125,8 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testStreamReplyWrite() throws IOException
     {
-        StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+        StreamReply rep = new StreamReply("this is a file", sessionId, StreamReply.Status.FILE_FINISHED);
         DataOutputStream out = getOutput("streaming.StreamReply.bin");
         StreamReply.serializer.serialize(rep, out, getVersion());
         rep.createMessage().serialize(out, getVersion());
@@ -159,13 +159,14 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testStreamRequestMessageWrite() throws IOException
     {
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
         Collection<Range<Token>> ranges = new ArrayList<Range<Token>>();
         for (int i = 0; i < 5; i++)
             ranges.add(new Range<Token>(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))),
new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
         List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
-        StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges,
"Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
-        StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true,
100, OperationType.BOOTSTRAP), 124L);
-        StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false,
100, OperationType.BOOTSTRAP), 124L);
+        StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges,
"Keyspace1", stores, sessionId, OperationType.RESTORE_REPLICA_COUNT);
+        StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true,
100, OperationType.BOOTSTRAP), sessionId);
+        StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false,
100, OperationType.BOOTSTRAP), sessionId);
 
         DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
         StreamRequest.serializer.serialize(msg0, out, getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 54d32569..2d96030 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -118,7 +118,7 @@ public class StreamingTransferTest extends SchemaLoader
         List<Range<Token>> ranges = new ArrayList<Range<Token>>();
         ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
-        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
         session.await();
     }
@@ -260,7 +260,7 @@ public class StreamingTransferTest extends SchemaLoader
         // Acquiring references, transferSSTables needs it
         sstable.acquireReference();
         sstable2.acquireReference();
-        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, (IStreamCallback)
null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
         session.await();
 
@@ -316,7 +316,7 @@ public class StreamingTransferTest extends SchemaLoader
         if (!SSTableReader.acquireReferences(ssTableReaders))
             throw new AssertionError();
 
-        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
+        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, (IStreamCallback)null);
         StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);
 
         session.await();


Mime
View raw message