Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE807D107 for ; Tue, 13 Nov 2012 22:20:11 +0000 (UTC) Received: (qmail 20634 invoked by uid 500); 13 Nov 2012 22:20:11 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 20537 invoked by uid 500); 13 Nov 2012 22:20:11 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 20304 invoked by uid 99); 13 Nov 2012 22:20:10 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Nov 2012 22:20:10 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9570055AA1; Tue, 13 Nov 2012 22:20:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [4/6] git commit: change stream session ID from (host, counter) to TimeUUID; patch by yukim reviewed by Michael Kjellman for CASSANDRA-4813 Message-Id: <20121113222010.9570055AA1@tyr.zones.apache.org> Date: Tue, 13 Nov 2012 22:20:10 +0000 (UTC) change stream session ID from (host, counter) to TimeUUID; patch by yukim reviewed by Michael Kjellman for CASSANDRA-4813 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/901a54a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/901a54a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/901a54a6 Branch: refs/heads/trunk Commit: 901a54a6d6c7b6f7276088e67b45f95dcc7b57f7 Parents: 228d1cf Author: Yuki Morishita Authored: Tue Nov 13 16:18:07 2012 -0600 Committer: Yuki Morishita Committed: Tue Nov 13 16:18:07 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/net/OutboundTcpConnectionPool.java | 2 +- .../cassandra/streaming/AbstractStreamSession.java | 20 ++--- .../apache/cassandra/streaming/FileStreamTask.java | 4 +- .../cassandra/streaming/IncomingStreamReader.java | 9 +- .../apache/cassandra/streaming/StreamHeader.java | 54 +++-------- .../cassandra/streaming/StreamInSession.java | 70 +++++---------- .../cassandra/streaming/StreamOutSession.java | 53 ++++------- .../apache/cassandra/streaming/StreamReply.java | 10 ++- .../streaming/StreamReplyVerbHandler.java | 2 +- .../apache/cassandra/streaming/StreamRequest.java | 21 +++-- .../serialization/1.2/streaming.StreamHeader.bin | Bin 175902 -> 175917 bytes .../serialization/1.2/streaming.StreamReply.bin | Bin 73 -> 89 bytes .../1.2/streaming.StreamRequestMessage.bin | Bin 7167 -> 7215 bytes .../cassandra/streaming/SerializationsTest.java | 27 +++--- .../cassandra/streaming/StreamingTransferTest.java | 6 +- 16 files changed, 106 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 518d3ec..4f54598 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905) * Better printing of AbstractBounds for tracing (CASSANDRA-4931) * Optimize mostRecentTomstone check in CC.collectAllData (CASSANDRA-4883) + * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813) Merged from 1.1: * reset getRangeSlice filter after finishing a row for get_paged_slice (CASSANDRA-4919) http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 4d9ce63..237363d 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -124,7 +124,7 @@ public class OutboundTcpConnectionPool else { Socket socket = SocketChannel.open(new InetSocketAddress(endPoint(), DatabaseDescriptor.getStoragePort())).socket(); - if (Config.getOutboundBindAny()) + if (Config.getOutboundBindAny() && !socket.isBound()) socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); return socket; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java index d190506..dd7d922 100644 --- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java +++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java @@ -19,6 +19,7 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.LoggerFactory; @@ -32,33 +33,30 @@ public abstract class AbstractStreamSession implements IEndpointStateChangeSubsc { private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class); + protected final InetAddress host; + protected final UUID sessionId; protected String table; - protected Pair context; protected final IStreamCallback callback; private final AtomicBoolean isClosed = new AtomicBoolean(false); - protected AbstractStreamSession(String table, Pair context, IStreamCallback callback) + protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback) { + this.host = host; + this.sessionId = sessionId; this.table = table; - this.context = context; this.callback = callback; Gossiper.instance.register(this); FailureDetector.instance.registerFailureDetectionEventListener(this); } - public int getSourceFlag() + public UUID getSessionId() { - return (int)(context.right >> 32); - } - - public long getSessionId() - { - return context.right; + return sessionId; } public InetAddress getHost() { - return context.left; + return host; } public void close(boolean success) http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index f4162de..67d5c35 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -90,7 +90,7 @@ public class FileStreamTask extends WrappedRunnable // (at this point, if we fail, it is the receiver's job to re-request) stream(); - StreamOutSession session = StreamOutSession.get(to, header.sessionId); + StreamOutSession session = StreamOutSession.get(header.sessionId); if (session == null) { logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down"); @@ -104,7 +104,7 @@ public class FileStreamTask extends WrappedRunnable } catch (IOException e) { - StreamOutSession session = StreamOutSession.get(to, header.sessionId); + StreamOutSession session = StreamOutSession.get(header.sessionId); if (session != null) session.close(false); throw e; http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index b4bea58..656a99d 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -60,16 +60,15 @@ public class IncomingStreamReader public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException { socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); - InetAddress host = header.broadcastAddress != null ? header.broadcastAddress - : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress(); + InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress(); if (header.pendingFiles.isEmpty() && header.file != null) { // StreamInSession should be created already when receiving 2nd and after files - if (!StreamInSession.hasSession(host, header.sessionId)) + if (!StreamInSession.hasSession(header.sessionId)) { StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE); OutboundTcpConnection.write(reply.createMessage(), - Long.toString(header.sessionId), + header.sessionId.toString(), System.currentTimeMillis(), new DataOutputStream(socket.getOutputStream()), MessagingService.instance().getVersion(host)); @@ -98,7 +97,7 @@ public class IncomingStreamReader { underliningStream = null; } - metrics = StreamingMetrics.get(socket.getInetAddress()); + metrics = StreamingMetrics.get(host); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java index 00a9a43..7f3e654 100644 --- a/src/java/org/apache/cassandra/streaming/StreamHeader.java +++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java @@ -17,59 +17,42 @@ */ package org.apache.cassandra.streaming; -import java.io.*; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDSerializer; public class StreamHeader { public static final IVersionedSerializer serializer = new StreamHeaderSerializer(); - // Streaming sessionId flags, used to avoid duplicate session id's between nodes. - // See StreamInSession and StreamOutSession - public static final int STREAM_IN_SOURCE_FLAG = 0; - public static final int STREAM_OUT_SOURCE_FLAG = 1; - public final String table; /** file being sent on initial stream */ public final PendingFile file; - /** session is tuple of (host, sessionid) */ - public final long sessionId; + /** session ID */ + public final UUID sessionId; /** files to add to the session */ public final Collection pendingFiles; - /** Address of the sender **/ - public final InetAddress broadcastAddress; - - public StreamHeader(String table, long sessionId, PendingFile file) + public StreamHeader(String table, UUID sessionId, PendingFile file) { this(table, sessionId, file, Collections.emptyList()); } - public StreamHeader(String table, long sessionId, PendingFile first, Collection pendingFiles) - { - this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress()); - } - - public StreamHeader(String table, long sessionId, PendingFile first, Collection pendingFiles, InetAddress broadcastAddress) + public StreamHeader(String table, UUID sessionId, PendingFile first, Collection pendingFiles) { this.table = table; this.sessionId = sessionId; this.file = first; this.pendingFiles = pendingFiles; - this.broadcastAddress = broadcastAddress; } private static class StreamHeaderSerializer implements IVersionedSerializer @@ -77,32 +60,24 @@ public class StreamHeader public void serialize(StreamHeader sh, DataOutput dos, int version) throws IOException { dos.writeUTF(sh.table); - dos.writeLong(sh.sessionId); + UUIDSerializer.serializer.serialize(sh.sessionId, dos, MessagingService.current_version); PendingFile.serializer.serialize(sh.file, dos, version); dos.writeInt(sh.pendingFiles.size()); - for(PendingFile file : sh.pendingFiles) - { + for (PendingFile file : sh.pendingFiles) PendingFile.serializer.serialize(file, dos, version); - } - CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos); } public StreamHeader deserialize(DataInput dis, int version) throws IOException { String table = dis.readUTF(); - long sessionId = dis.readLong(); + UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version); PendingFile file = PendingFile.serializer.deserialize(dis, version); int size = dis.readInt(); List pendingFiles = new ArrayList(size); for (int i = 0; i < size; i++) - { pendingFiles.add(PendingFile.serializer.deserialize(dis, version)); - } - InetAddress bca = null; - if (version > MessagingService.VERSION_10) - bca = CompactEndpointSerializationHelper.deserialize(dis); - return new StreamHeader(table, sessionId, file, pendingFiles, bca); + return new StreamHeader(table, sessionId, file, pendingFiles); } public long serializedSize(StreamHeader sh, int version) @@ -111,9 +86,8 @@ public class StreamHeader size += TypeSizes.NATIVE.sizeof(sh.sessionId); size += PendingFile.serializer.serializedSize(sh.file, version); size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size()); - for(PendingFile file : sh.pendingFiles) + for (PendingFile file : sh.pendingFiles) size += PendingFile.serializer.serializedSize(file, version); - size += CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index f8bde91..a61ceb7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -23,88 +23,62 @@ import java.net.InetAddress; import java.net.Socket; import java.util.*; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.MessagingService; +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Table; -import org.apache.cassandra.gms.*; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.OutboundTcpConnection; -import org.apache.cassandra.utils.Pair; -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.cliffc.high_scale_lib.NonBlockingHashSet; +import org.apache.cassandra.utils.UUIDGen; /** each context gets its own StreamInSession. So there may be >1 Session per host */ public class StreamInSession extends AbstractStreamSession { private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class); - private static final ConcurrentMap, StreamInSession> sessions = new NonBlockingHashMap, StreamInSession>(); + private static final ConcurrentMap sessions = new NonBlockingHashMap(); private final Set files = new NonBlockingHashSet(); private final List readers = new ArrayList(); private PendingFile current; private Socket socket; private volatile int retries; - private final static AtomicInteger sessionIdCounter = new AtomicInteger(0); - - /** - * The next session id is a combination of a local integer counter and a flag used to avoid collisions - * between session id's generated on different machines. Nodes can may have StreamOutSessions with the - * following contexts: - * - * <1.1.1.1, (stream_in_flag, 6)> - * <1.1.1.1, (stream_out_flag, 6)> - * - * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by - * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was - * created by this node. - * - * Note: The StreamInSession results in a StreamOutSession on the target that uses the StreamInSession sessionId. - * - * @return next StreamInSession sessionId - */ - private static long nextSessionId() - { - return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet()); - } - private StreamInSession(Pair context, IStreamCallback callback) + private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback) { - super(null, context, callback); + super(null, host, sessionId, callback); } public static StreamInSession create(InetAddress host, IStreamCallback callback) { - Pair context = Pair.create(host, nextSessionId()); - StreamInSession session = new StreamInSession(context, callback); - sessions.put(context, session); + StreamInSession session = new StreamInSession(host, UUIDGen.makeType1UUIDFromHost(host), callback); + sessions.put(session.getSessionId(), session); return session; } - public static StreamInSession get(InetAddress host, long sessionId) + public static StreamInSession get(InetAddress host, UUID sessionId) { - Pair context = Pair.create(host, sessionId); - StreamInSession session = sessions.get(context); + StreamInSession session = sessions.get(sessionId); if (session == null) { - StreamInSession possibleNew = new StreamInSession(context, null); - if ((session = sessions.putIfAbsent(context, possibleNew)) == null) + StreamInSession possibleNew = new StreamInSession(host, sessionId, null); + if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null) session = possibleNew; } return session; } - public static boolean hasSession(InetAddress host, long sessionId) + public static boolean hasSession(UUID sessionId) { - Pair context = Pair.create(host, sessionId); - return sessions.get(context) != null; + return sessions.get(sessionId) != null; } public void setCurrentFile(PendingFile file) @@ -227,7 +201,7 @@ public class StreamInSession extends AbstractStreamSession { if (socket != null) OutboundTcpConnection.write(reply.createMessage(), - context.right.toString(), + sessionId.toString(), System.currentTimeMillis(), new DataOutputStream(socket.getOutputStream()), MessagingService.instance().getVersion(getHost())); @@ -246,7 +220,7 @@ public class StreamInSession extends AbstractStreamSession protected void closeInternal(boolean success) { - sessions.remove(context); + sessions.remove(sessionId); if (!success && FailureDetector.instance.isAlive(getHost())) { StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE); @@ -269,11 +243,11 @@ public class StreamInSession extends AbstractStreamSession public static Set getIncomingFiles(InetAddress host) { Set set = new HashSet(); - for (Map.Entry, StreamInSession> entry : sessions.entrySet()) + for (Map.Entry entry : sessions.entrySet()) { - if (entry.getKey().left.equals(host)) + StreamInSession session = entry.getValue(); + if (session.getHost().equals(host)) { - StreamInSession session = entry.getValue(); if (session.current != null) set.add(session.current); set.addAll(session.files); http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamOutSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java index e1f42dc..ad312ff 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; + import org.cliffc.high_scale_lib.NonBlockingHashMap; /** @@ -38,57 +40,37 @@ public class StreamOutSession extends AbstractStreamSession private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class); // one host may have multiple stream sessions. - private static final ConcurrentMap, StreamOutSession> streams = new NonBlockingHashMap, StreamOutSession>(); - private final static AtomicInteger sessionIdCounter = new AtomicInteger(0); - - /** - * The next session id is a combination of a local integer counter and a flag used to avoid collisions - * between session id's generated on different machines. Nodes can may have StreamOutSessions with the - * following contexts: - * - * <1.1.1.1, (stream_in_flag, 6)> - * <1.1.1.1, (stream_out_flag, 6)> - * - * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by - * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was - * created by this node. - * @return next StreamOutSession sessionId - */ - private static long nextSessionId() - { - return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet()); - } + private static final ConcurrentMap streams = new NonBlockingHashMap(); public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback) { - return create(table, host, nextSessionId(), callback); + return create(table, host, UUIDGen.makeType1UUIDFromHost(host), callback); } - public static StreamOutSession create(String table, InetAddress host, long sessionId) + public static StreamOutSession create(String table, InetAddress host, UUID sessionId) { return create(table, host, sessionId, null); } - public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback) + public static StreamOutSession create(String table, InetAddress host, UUID sessionId, IStreamCallback callback) { - Pair context = Pair.create(host, sessionId); - StreamOutSession session = new StreamOutSession(table, context, callback); - streams.put(context, session); + StreamOutSession session = new StreamOutSession(table, host, sessionId, callback); + streams.put(sessionId, session); return session; } - public static StreamOutSession get(InetAddress host, long sessionId) + public static StreamOutSession get(UUID sessionId) { - return streams.get(Pair.create(host, sessionId)); + return streams.get(sessionId); } private final Map files = new NonBlockingHashMap(); private volatile String currentFile; - private StreamOutSession(String table, Pair context, IStreamCallback callback) + private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback) { - super(table, context, callback); + super(table, host, sessionId, callback); } public void addFilesToStream(List pendingFiles) @@ -129,13 +111,13 @@ public class StreamOutSession extends AbstractStreamSession // Release reference on last file (or any uncompleted ones) for (PendingFile file : files.values()) file.sstable.releaseReference(); - streams.remove(context); + streams.remove(sessionId); } /** convenience method for use when testing */ void await() throws InterruptedException { - while (streams.containsKey(context)) + while (streams.containsKey(sessionId)) Thread.sleep(10); } @@ -157,10 +139,11 @@ public class StreamOutSession extends AbstractStreamSession public static List getOutgoingFiles(InetAddress host) { List list = new ArrayList(); - for (Map.Entry, StreamOutSession> entry : streams.entrySet()) + for (Map.Entry entry : streams.entrySet()) { - if (entry.getKey().left.equals(host)) - list.addAll(entry.getValue().getFiles()); + StreamOutSession session = entry.getValue(); + if (session.getHost().equals(host)) + list.addAll(session.getFiles()); } return list; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReply.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java index bfb65e3..eee8e37 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReply.java +++ b/src/java/org/apache/cassandra/streaming/StreamReply.java @@ -20,11 +20,13 @@ package org.apache.cassandra.streaming; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDSerializer; public class StreamReply { @@ -38,11 +40,11 @@ public class StreamReply public static final IVersionedSerializer serializer = new FileStatusSerializer(); - public final long sessionId; + public final UUID sessionId; public final String file; public final Status action; - public StreamReply(String file, long sessionId, Status action) + public StreamReply(String file, UUID sessionId, Status action) { this.file = file; this.action = action; @@ -68,14 +70,14 @@ public class StreamReply { public void serialize(StreamReply reply, DataOutput dos, int version) throws IOException { - dos.writeLong(reply.sessionId); + UUIDSerializer.serializer.serialize(reply.sessionId, dos, MessagingService.current_version); dos.writeUTF(reply.file); dos.writeInt(reply.action.ordinal()); } public StreamReply deserialize(DataInput dis, int version) throws IOException { - long sessionId = dis.readLong(); + UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version); String targetFile = dis.readUTF(); Status action = Status.values()[dis.readInt()]; return new StreamReply(targetFile, sessionId, action); http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java index 714f76a..ebcee8a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java +++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java @@ -31,7 +31,7 @@ public class StreamReplyVerbHandler implements IVerbHandler { StreamReply reply = message.payload; logger.debug("Received StreamReply {}", reply); - StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId); + StreamOutSession session = StreamOutSession.get(reply.sessionId); if (session == null) { logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index a8de4a6..b49fb36 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.UUID; import com.google.common.collect.Iterables; @@ -38,6 +39,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDSerializer; /** * This class encapsulates the message that needs to be sent to nodes @@ -50,19 +52,19 @@ public class StreamRequest { public static final IVersionedSerializer serializer = new StreamRequestSerializer(); - protected final long sessionId; + protected final UUID sessionId; protected final InetAddress target; // if this is specified, ranges and table should not be. protected final PendingFile file; - // if these are specified, file shoud not be. + // if these are specified, file should not be. protected final Collection> ranges; protected final String table; protected final Iterable columnFamilies; protected final OperationType type; - StreamRequest(InetAddress target, Collection> ranges, String table, Iterable columnFamilies, long sessionId, OperationType type) + StreamRequest(InetAddress target, Collection> ranges, String table, Iterable columnFamilies, UUID sessionId, OperationType type) { this.target = target; this.ranges = ranges; @@ -73,7 +75,7 @@ public class StreamRequest file = null; } - StreamRequest(InetAddress target, PendingFile file, long sessionId) + StreamRequest(InetAddress target, PendingFile file, UUID sessionId) { this.target = target; this.file = file; @@ -100,7 +102,7 @@ public class StreamRequest sb.append("@"); sb.append(target); sb.append("------->"); - for ( Range range : ranges ) + for (Range range : ranges) { sb.append(range); sb.append(" "); @@ -118,7 +120,7 @@ public class StreamRequest { public void serialize(StreamRequest srm, DataOutput dos, int version) throws IOException { - dos.writeLong(srm.sessionId); + UUIDSerializer.serializer.serialize(srm.sessionId, dos, MessagingService.current_version); CompactEndpointSerializationHelper.serialize(srm.target, dos); if (srm.file != null) { @@ -143,7 +145,7 @@ public class StreamRequest public StreamRequest deserialize(DataInput dis, int version) throws IOException { - long sessionId = dis.readLong(); + UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version); InetAddress target = CompactEndpointSerializationHelper.deserialize(dis); boolean singleFile = dis.readBoolean(); if (singleFile) @@ -156,10 +158,9 @@ public class StreamRequest String table = dis.readUTF(); int size = dis.readInt(); List> ranges = (size == 0) ? null : new ArrayList>(size); - for( int i = 0; i < size; ++i ) + for (int i = 0; i < size; ++i) ranges.add((Range) AbstractBounds.serializer.deserialize(dis, version).toTokenBounds()); - OperationType type = OperationType.RESTORE_REPLICA_COUNT; - type = OperationType.valueOf(dis.readUTF()); + OperationType type = OperationType.valueOf(dis.readUTF()); List stores = new ArrayList(); int cfsSize = dis.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamHeader.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/1.2/streaming.StreamHeader.bin b/test/data/serialization/1.2/streaming.StreamHeader.bin index a9f1d39..ac5b7ac 100644 Binary files a/test/data/serialization/1.2/streaming.StreamHeader.bin and b/test/data/serialization/1.2/streaming.StreamHeader.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamReply.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/1.2/streaming.StreamReply.bin b/test/data/serialization/1.2/streaming.StreamReply.bin index 4b74058..6933316 100644 Binary files a/test/data/serialization/1.2/streaming.StreamReply.bin and b/test/data/serialization/1.2/streaming.StreamReply.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamRequestMessage.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin index 75af388..fd53579 100644 Binary files a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin and b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java index 95a7d8b..47990ab 100644 --- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java @@ -22,10 +22,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.util.*; import org.junit.Test; @@ -44,6 +41,7 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; public class SerializationsTest extends AbstractSerializationsTester { @@ -84,14 +82,15 @@ public class SerializationsTest extends AbstractSerializationsTester private void testStreamHeaderWrite() throws IOException { - StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100, OperationType.BOOTSTRAP)); - StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100, OperationType.BOOTSTRAP)); + UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()); + StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP)); + StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, makePendingFile(false, 100, OperationType.BOOTSTRAP)); Collection files = new ArrayList(); for (int i = 0; i < 50; i++) files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP)); - StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), files); - StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files); - StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList()); + StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), files); + StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, files); + StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList()); DataOutputStream out = getOutput("streaming.StreamHeader.bin"); StreamHeader.serializer.serialize(sh0, out, getVersion()); @@ -126,7 +125,8 @@ public class SerializationsTest extends AbstractSerializationsTester private void testStreamReplyWrite() throws IOException { - StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED); + UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()); + StreamReply rep = new StreamReply("this is a file", sessionId, StreamReply.Status.FILE_FINISHED); DataOutputStream out = getOutput("streaming.StreamReply.bin"); StreamReply.serializer.serialize(rep, out, getVersion()); rep.createMessage().serialize(out, getVersion()); @@ -159,13 +159,14 @@ public class SerializationsTest extends AbstractSerializationsTester private void testStreamRequestMessageWrite() throws IOException { + UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()); Collection> ranges = new ArrayList>(); for (int i = 0; i < 5; i++) ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5))))); List stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1")); - StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT); - StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L); - StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L); + StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, sessionId, OperationType.RESTORE_REPLICA_COUNT); + StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), sessionId); + StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), sessionId); DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin"); StreamRequest.serializer.serialize(msg0, out, getVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 54d32569..2d96030 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -118,7 +118,7 @@ public class StreamingTransferTest extends SchemaLoader List> ranges = new ArrayList>(); ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); - StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null); + StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null); StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); session.await(); } @@ -260,7 +260,7 @@ public class StreamingTransferTest extends SchemaLoader // Acquiring references, transferSSTables needs it sstable.acquireReference(); sstable2.acquireReference(); - StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null); + StreamOutSession session = StreamOutSession.create(tablename, LOCAL, (IStreamCallback) null); StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP); session.await(); @@ -316,7 +316,7 @@ public class StreamingTransferTest extends SchemaLoader if (!SSTableReader.acquireReferences(ssTableReaders)) throw new AssertionError(); - StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null); + StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, (IStreamCallback)null); StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP); session.await();