cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [8/15] git commit: Automated refactor: rename Message out of class names that aren't MessageIn, MessageOut, or related
Date Tue, 08 May 2012 17:56:15 GMT
Automated refactor: rename Message out of class names that aren't MessageIn, MessageOut, or related


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9471e8d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9471e8d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9471e8d0

Branch: refs/heads/trunk
Commit: 9471e8d0bcf77569993064e6151595fa7993ebd1
Parents: a06be23
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Mar 26 17:53:36 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue May 8 12:40:53 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/GossipDigestAck.java  |   88 +++++++
 .../org/apache/cassandra/gms/GossipDigestAck2.java |   77 ++++++
 .../cassandra/gms/GossipDigestAck2Message.java     |   77 ------
 .../cassandra/gms/GossipDigestAck2VerbHandler.java |    7 +-
 .../cassandra/gms/GossipDigestAckMessage.java      |   88 -------
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |   15 +-
 .../org/apache/cassandra/gms/GossipDigestSyn.java  |  137 ++++++++++
 .../cassandra/gms/GossipDigestSynMessage.java      |  137 ----------
 .../cassandra/gms/GossipDigestSynVerbHandler.java  |   15 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |   22 +-
 .../org/apache/cassandra/net/MessagingService.java |   14 +-
 .../org/apache/cassandra/streaming/StreamIn.java   |    2 +-
 .../apache/cassandra/streaming/StreamRequest.java  |  197 +++++++++++++++
 .../cassandra/streaming/StreamRequestMessage.java  |  197 ---------------
 .../streaming/StreamRequestVerbHandler.java        |   11 +-
 .../apache/cassandra/gms/SerializationsTest.java   |   20 +-
 .../cassandra/streaming/SerializationsTest.java    |   18 +-
 .../org/apache/cassandra/streaming/StreamUtil.java |    5 +-
 18 files changed, 555 insertions(+), 572 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAck.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
new file mode 100644
index 0000000..64a6e3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+
+
+/**
+ * This ack gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ */
+
+public class GossipDigestAck
+{
+    private static final IVersionedSerializer<GossipDigestAck> serializer;
+    static
+    {
+        serializer = new GossipDigestAckSerializer();
+    }
+
+    final List<GossipDigest> gDigestList;
+    final Map<InetAddress, EndpointState> epStateMap;
+
+    public static IVersionedSerializer<GossipDigestAck> serializer()
+    {
+        return serializer;
+    }
+
+    GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddress, EndpointState> epStateMap)
+    {
+        this.gDigestList = gDigestList;
+        this.epStateMap = epStateMap;
+    }
+
+    List<GossipDigest> getGossipDigestList()
+    {
+        return gDigestList;
+    }
+
+    Map<InetAddress, EndpointState> getEndpointStateMap()
+    {
+        return epStateMap;
+    }
+}
+
+class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck>
+{
+    public void serialize(GossipDigestAck gDigestAckMessage, DataOutput dos, int version) throws IOException
+    {
+        GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, dos, version);
+        dos.writeBoolean(true); // 0.6 compatibility
+        EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap, dos, version);
+    }
+
+    public GossipDigestAck deserialize(DataInput dis, int version) throws IOException
+    {
+        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis, version);
+        dis.readBoolean(); // 0.6 compatibility
+        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
+        return new GossipDigestAck(gDigestList, epStateMap);
+    }
+
+    public long serializedSize(GossipDigestAck gossipDigestAckMessage, int version)
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
new file mode 100644
index 0000000..a95f43a
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.Map;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+
+/**
+ * This ack gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
+ * last stage of the 3 way messaging of the Gossip protocol.
+ */
+
+public class GossipDigestAck2
+{
+    private static final IVersionedSerializer<GossipDigestAck2> serializer;
+    static
+    {
+        serializer = new GossipDigestAck2Serializer();
+    }
+
+    final Map<InetAddress, EndpointState> epStateMap;
+
+    public static IVersionedSerializer<GossipDigestAck2> serializer()
+    {
+        return serializer;
+    }
+
+    GossipDigestAck2(Map<InetAddress, EndpointState> epStateMap)
+    {
+        this.epStateMap = epStateMap;
+    }
+
+    Map<InetAddress, EndpointState> getEndpointStateMap()
+    {
+         return epStateMap;
+    }
+}
+
+class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck2>
+{
+    public void serialize(GossipDigestAck2 gDigestAck2Message, DataOutput dos, int version) throws IOException
+    {
+        /* Use the EndpointState */
+        EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap, dos, version);
+    }
+
+    public GossipDigestAck2 deserialize(DataInput dis, int version) throws IOException
+    {
+        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
+        return new GossipDigestAck2(epStateMap);
+    }
+
+    public long serializedSize(GossipDigestAck2 gossipDigestAck2Message, int version)
+    {
+        throw new UnsupportedOperationException();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
deleted file mode 100644
index 609331f..0000000
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.gms;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Map;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
- * last stage of the 3 way messaging of the Gossip protocol.
- */
-
-public class GossipDigestAck2Message
-{
-    private static final IVersionedSerializer<GossipDigestAck2Message> serializer;
-    static
-    {
-        serializer = new GossipDigestAck2MessageSerializer();
-    }
-
-    final Map<InetAddress, EndpointState> epStateMap;
-
-    public static IVersionedSerializer<GossipDigestAck2Message> serializer()
-    {
-        return serializer;
-    }
-
-    GossipDigestAck2Message(Map<InetAddress, EndpointState> epStateMap)
-    {
-        this.epStateMap = epStateMap;
-    }
-
-    Map<InetAddress, EndpointState> getEndpointStateMap()
-    {
-         return epStateMap;
-    }
-}
-
-class GossipDigestAck2MessageSerializer implements IVersionedSerializer<GossipDigestAck2Message>
-{
-    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutput dos, int version) throws IOException
-    {
-        /* Use the EndpointState */
-        EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap, dos, version);
-    }
-
-    public GossipDigestAck2Message deserialize(DataInput dis, int version) throws IOException
-    {
-        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
-        return new GossipDigestAck2Message(epStateMap);
-    }
-
-    public long serializedSize(GossipDigestAck2Message gossipDigestAck2Message, int version)
-    {
-        throw new UnsupportedOperationException();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index 42080bd..3fb9255 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -17,23 +17,20 @@
  */
 package org.apache.cassandra.gms;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 
-public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck2Message>
+public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck2>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestAck2Message> message, String id)
+    public void doVerb(MessageIn<GossipDigestAck2> message, String id)
     {
         if (logger.isTraceEnabled())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
deleted file mode 100644
index 60706cb..0000000
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.gms;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
- * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
- */
-
-public class GossipDigestAckMessage // TODO rename
-{
-    private static final IVersionedSerializer<GossipDigestAckMessage> serializer;
-    static
-    {
-        serializer = new GossipDigestAckMessageSerializer();
-    }
-
-    final List<GossipDigest> gDigestList;
-    final Map<InetAddress, EndpointState> epStateMap;
-
-    public static IVersionedSerializer<GossipDigestAckMessage> serializer()
-    {
-        return serializer;
-    }
-
-    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<InetAddress, EndpointState> epStateMap)
-    {
-        this.gDigestList = gDigestList;
-        this.epStateMap = epStateMap;
-    }
-
-    List<GossipDigest> getGossipDigestList()
-    {
-        return gDigestList;
-    }
-
-    Map<InetAddress, EndpointState> getEndpointStateMap()
-    {
-        return epStateMap;
-    }
-}
-
-class GossipDigestAckMessageSerializer implements IVersionedSerializer<GossipDigestAckMessage>
-{
-    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutput dos, int version) throws IOException
-    {
-        GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, dos, version);
-        dos.writeBoolean(true); // 0.6 compatibility
-        EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap, dos, version);
-    }
-
-    public GossipDigestAckMessage deserialize(DataInput dis, int version) throws IOException
-    {
-        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis, version);
-        dis.readBoolean(); // 0.6 compatibility
-        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
-        return new GossipDigestAckMessage(gDigestList, epStateMap);
-    }
-
-    public long serializedSize(GossipDigestAckMessage gossipDigestAckMessage, int version)
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 69fa5cf..21c52b5 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
@@ -27,17 +25,16 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAckMessage>
+public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestAckMessage> message, String id)
+    public void doVerb(MessageIn<GossipDigestAck> message, String id)
     {
         InetAddress from = message.from;
         if (logger.isTraceEnabled())
@@ -49,7 +46,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAckM
             return;
         }
 
-        GossipDigestAckMessage gDigestAckMessage = message.payload;
+        GossipDigestAck gDigestAckMessage = message.payload;
         List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
 
@@ -70,9 +67,9 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAckM
                 deltaEpStateMap.put(addr, localEpStatePtr);
         }
 
-        MessageOut<GossipDigestAck2Message> gDigestAck2Message = new MessageOut<GossipDigestAck2Message>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
-                                                                                                         new GossipDigestAck2Message(deltaEpStateMap),
-                                                                                                         GossipDigestAck2Message.serializer());
+        MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
+                                                                                                         new GossipDigestAck2(deltaEpStateMap),
+                                                                                                         GossipDigestAck2.serializer());
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestAck2Message to {}", from);
         MessagingService.instance().sendOneWay(gDigestAck2Message, from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
new file mode 100644
index 0000000..e1dd59d
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a
+ * round.
+ */
+
+public class GossipDigestSyn
+{
+    private static final IVersionedSerializer<GossipDigestSyn> serializer;
+    static
+    {
+        serializer = new GossipDigestSynSerializer();
+    }
+
+    final String clusterId;
+    final List<GossipDigest> gDigests;
+
+    public static IVersionedSerializer<GossipDigestSyn> serializer()
+    {
+        return serializer;
+    }
+
+    public GossipDigestSyn(String clusterId, List<GossipDigest> gDigests)
+    {
+        this.clusterId = clusterId;
+        this.gDigests = gDigests;
+    }
+
+    List<GossipDigest> getGossipDigests()
+    {
+        return gDigests;
+    }
+}
+
+class GossipDigestSerializationHelper
+{
+    static void serialize(List<GossipDigest> gDigestList, DataOutput dos, int version) throws IOException
+    {
+        dos.writeInt(gDigestList.size());
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            GossipDigest.serializer().serialize( gDigest, dos, version);
+        }
+    }
+
+    static List<GossipDigest> deserialize(DataInput dis, int version) throws IOException
+    {
+        int size = dis.readInt();
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
+
+        for ( int i = 0; i < size; ++i )
+        {
+            gDigests.add(GossipDigest.serializer().deserialize(dis, version));
+        }
+        return gDigests;
+    }
+}
+
+class EndpointStatesSerializationHelper
+{
+    static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutput dos, int version) throws IOException
+    {
+        dos.writeInt(epStateMap.size());
+        for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
+        {
+            InetAddress ep = entry.getKey();
+            CompactEndpointSerializationHelper.serialize(ep, dos);
+            EndpointState.serializer().serialize(entry.getValue(), dos, version);
+        }
+    }
+
+    static Map<InetAddress, EndpointState> deserialize(DataInput dis, int version) throws IOException
+    {
+        int size = dis.readInt();
+        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size);
+
+        for ( int i = 0; i < size; ++i )
+        {
+            InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis);
+            EndpointState epState = EndpointState.serializer().deserialize(dis, version);
+            epStateMap.put(ep, epState);
+        }
+        return epStateMap;
+    }
+}
+
+class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn>
+{
+    public void serialize(GossipDigestSyn gDigestSynMessage, DataOutput dos, int version) throws IOException
+    {
+        dos.writeUTF(gDigestSynMessage.clusterId);
+        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests, dos, version);
+    }
+
+    public GossipDigestSyn deserialize(DataInput dis, int version) throws IOException
+    {
+        String clusterId = dis.readUTF();
+        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis, version);
+        return new GossipDigestSyn(clusterId, gDigests);
+    }
+
+    public long serializedSize(GossipDigestSyn gossipDigestSynMessage, int version)
+    {
+        throw new UnsupportedOperationException();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
deleted file mode 100644
index 05e210f..0000000
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.gms;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-
-
-/**
- * This is the first message that gets sent out as a start of the Gossip protocol in a
- * round.
- */
-
-public class GossipDigestSynMessage
-{
-    private static final IVersionedSerializer<GossipDigestSynMessage> serializer;
-    static
-    {
-        serializer = new GossipDigestSynMessageSerializer();
-    }
-
-    final String clusterId;
-    final List<GossipDigest> gDigests;
-
-    public static IVersionedSerializer<GossipDigestSynMessage> serializer()
-    {
-        return serializer;
-    }
-
-    public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
-    {
-        this.clusterId = clusterId;
-        this.gDigests = gDigests;
-    }
-
-    List<GossipDigest> getGossipDigests()
-    {
-        return gDigests;
-    }
-}
-
-class GossipDigestSerializationHelper
-{
-    static void serialize(List<GossipDigest> gDigestList, DataOutput dos, int version) throws IOException
-    {
-        dos.writeInt(gDigestList.size());
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            GossipDigest.serializer().serialize( gDigest, dos, version);
-        }
-    }
-
-    static List<GossipDigest> deserialize(DataInput dis, int version) throws IOException
-    {
-        int size = dis.readInt();
-        List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
-
-        for ( int i = 0; i < size; ++i )
-        {
-            gDigests.add(GossipDigest.serializer().deserialize(dis, version));
-        }
-        return gDigests;
-    }
-}
-
-class EndpointStatesSerializationHelper
-{
-    static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutput dos, int version) throws IOException
-    {
-        dos.writeInt(epStateMap.size());
-        for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
-        {
-            InetAddress ep = entry.getKey();
-            CompactEndpointSerializationHelper.serialize(ep, dos);
-            EndpointState.serializer().serialize(entry.getValue(), dos, version);
-        }
-    }
-
-    static Map<InetAddress, EndpointState> deserialize(DataInput dis, int version) throws IOException
-    {
-        int size = dis.readInt();
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size);
-
-        for ( int i = 0; i < size; ++i )
-        {
-            InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis);
-            EndpointState epState = EndpointState.serializer().deserialize(dis, version);
-            epStateMap.put(ep, epState);
-        }
-        return epStateMap;
-    }
-}
-
-class GossipDigestSynMessageSerializer implements IVersionedSerializer<GossipDigestSynMessage>
-{
-    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutput dos, int version) throws IOException
-    {
-        dos.writeUTF(gDigestSynMessage.clusterId);
-        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests, dos, version);
-    }
-
-    public GossipDigestSynMessage deserialize(DataInput dis, int version) throws IOException
-    {
-        String clusterId = dis.readUTF();
-        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis, version);
-        return new GossipDigestSynMessage(clusterId, gDigests);
-    }
-
-    public long serializedSize(GossipDigestSynMessage gossipDigestSynMessage, int version)
-    {
-        throw new UnsupportedOperationException();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 70df9d3..8cd8258 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 
@@ -26,17 +24,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSynMessage>
+public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
 {
     private static final Logger logger = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestSynMessage> message, String id)
+    public void doVerb(MessageIn<GossipDigestSyn> message, String id)
     {
         InetAddress from = message.from;
         if (logger.isTraceEnabled())
@@ -48,7 +45,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSynM
             return;
         }
 
-        GossipDigestSynMessage gDigestMessage = message.payload;
+        GossipDigestSyn gDigestMessage = message.payload;
         /* If the message is from a different cluster throw it away. */
         if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
         {
@@ -76,9 +73,9 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSynM
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
 
-        MessageOut<GossipDigestAckMessage> gDigestAckMessage = new MessageOut<GossipDigestAckMessage>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
-                                                                                                      new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap),
-                                                                                                      GossipDigestAckMessage.serializer());
+        MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
+                                                                                                      new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
+                                                                                                      GossipDigestAck.serializer());
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestAckMessage to {}", from);
         MessagingService.instance().sendOneWay(gDigestAckMessage, from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index f19291c..de5ac0b 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -122,10 +122,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
                 if ( gDigests.size() > 0 )
                 {
-                    GossipDigestSynMessage digestSynMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
-                    MessageOut<GossipDigestSynMessage> message = new MessageOut<GossipDigestSynMessage>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
+                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), gDigests);
+                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                                                                                                         digestSynMessage,
-                                                                                                        GossipDigestSynMessage.serializer());
+                                                                                                        GossipDigestSyn.serializer());
                     /* Gossip to some random live member */
                     boolean gossipedToSeed = doGossipToLiveMember(message);
 
@@ -507,7 +507,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      * @param epSet a set of endpoint from which a random endpoint is chosen.
      *  @return true if the chosen endpoint is also a seed.
      */
-    private boolean sendGossip(MessageOut<GossipDigestSynMessage> message, Set<InetAddress> epSet)
+    private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddress> epSet)
     {
         int size = epSet.size();
         if (size < 1)
@@ -517,13 +517,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         int index = (size == 1) ? 0 : random.nextInt(size);
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
-            logger.trace("Sending a GossipDigestSynMessage to {} ...", to);
+            logger.trace("Sending a GossipDigestSyn to {} ...", to);
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
 
     /* Sends a Gossip message to a live member and returns true if the recipient was a seed */
-    private boolean doGossipToLiveMember(MessageOut<GossipDigestSynMessage> message)
+    private boolean doGossipToLiveMember(MessageOut<GossipDigestSyn> message)
     {
         int size = liveEndpoints.size();
         if ( size == 0 )
@@ -532,7 +532,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /* Sends a Gossip message to an unreachable member */
-    private void doGossipToUnreachableMember(MessageOut<GossipDigestSynMessage> message)
+    private void doGossipToUnreachableMember(MessageOut<GossipDigestSyn> message)
     {
         double liveEndpointCount = liveEndpoints.size();
         double unreachableEndpointCount = unreachableEndpoints.size();
@@ -547,7 +547,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /* Gossip to a seed for facilitating partition healing */
-    private void doGossipToSeed(MessageOut<GossipDigestSynMessage> prod)
+    private void doGossipToSeed(MessageOut<GossipDigestSyn> prod)
     {
         int size = seeds.size();
         if ( size > 0 )
@@ -1090,9 +1090,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(e);
         }
-        MessageOut<GossipDigestSynMessage> message = new MessageOut<GossipDigestSynMessage>(MessagingService.Verb.GOSSIP_SHUTDOWN,
-                                                                                            null,
-                                                                                            GossipDigestSynMessage.serializer());
+        MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_SHUTDOWN,
+                                                                              null,
+                                                                              GossipDigestSyn.serializer());
         for (InetAddress ep : liveEndpoints)
         {
             MessagingService.instance().sendOneWay(message, ep);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ef85a82..4f3b603 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -48,9 +48,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.BootStrapper;
-import org.apache.cassandra.gms.GossipDigestAck2Message;
-import org.apache.cassandra.gms.GossipDigestAckMessage;
-import org.apache.cassandra.gms.GossipDigestSynMessage;
+import org.apache.cassandra.gms.GossipDigestAck2;
+import org.apache.cassandra.gms.GossipDigestAck;
+import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
@@ -176,16 +176,16 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, RowMutation.serializer());
         put(Verb.READ, ReadCommand.serializer());
         put(Verb.STREAM_REPLY, StreamReply.serializer);
-        put(Verb.STREAM_REQUEST, StreamRequestMessage.serializer());
+        put(Verb.STREAM_REQUEST, StreamRequest.serializer());
         put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.TREE_REQUEST, AntiEntropyService.TreeRequest.serializer);
         put(Verb.TREE_RESPONSE, AntiEntropyService.Validator.serializer);
         put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
         put(Verb.STREAMING_REPAIR_RESPONSE, UUIDGen.serializer);
-        put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAckMessage.serializer());
-        put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2Message.serializer());
-        put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSynMessage.serializer());
+        put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer());
+        put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer());
+        put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer());
         put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance);
         put(Verb.TRUNCATE, Truncation.serializer());
         put(Verb.SCHEMA_CHECK, null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
index 8997156..9f12aff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -58,7 +58,7 @@ public class StreamIn
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
         StreamInSession session = StreamInSession.create(source, callback);
-        StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getBroadcastAddress(),
+        StreamRequest srm = new StreamRequest(FBUtilities.getBroadcastAddress(),
                                                             ranges,
                                                             tableName,
                                                             columnFamilies,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
new file mode 100644
index 0000000..3814049
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+* This class encapsulates the message that needs to be sent to nodes
+* that handoff data. The message contains information about ranges
+* that need to be transferred and the target node.
+*
+* If a file is specified, ranges and table will not. vice-versa should hold as well.
+*/
+public class StreamRequest
+{
+    private static final IVersionedSerializer<StreamRequest> serializer;
+    static
+    {
+        serializer = new StreamRequestSerializer();
+    }
+
+    public static IVersionedSerializer<StreamRequest> serializer()
+    {
+        return serializer;
+    }
+
+    protected final long sessionId;
+    protected final InetAddress target;
+
+    // if this is specified, ranges and table should not be.
+    protected final PendingFile file;
+
+    // if these are specified, file shoud not be.
+    protected final Collection<Range<Token>> ranges;
+    protected final String table;
+    protected final Iterable<ColumnFamilyStore> columnFamilies;
+    protected final OperationType type;
+
+    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
+    {
+        this.target = target;
+        this.ranges = ranges;
+        this.table = table;
+        this.columnFamilies = columnFamilies;
+        this.sessionId = sessionId;
+        this.type = type;
+        file = null;
+    }
+
+    StreamRequest(InetAddress target, PendingFile file, long sessionId)
+    {
+        this.target = target;
+        this.file = file;
+        this.sessionId = sessionId;
+        this.type = file.type;
+        ranges = null;
+        table = null;
+        columnFamilies = null;
+    }
+
+    public MessageOut<StreamRequest> createMessage()
+    {
+        return new MessageOut<StreamRequest>(MessagingService.Verb.STREAM_REQUEST, this, serializer);
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        if (file == null)
+        {
+            sb.append(table);
+            sb.append("@");
+            sb.append(columnFamilies.toString());
+            sb.append("@");
+            sb.append(target);
+            sb.append("------->");
+            for ( Range<Token> range : ranges )
+            {
+                sb.append(range);
+                sb.append(" ");
+            }
+            sb.append(type);
+        }
+        else
+        {
+            sb.append(file.toString());
+        }
+        return sb.toString();
+    }
+
+    private static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
+    {
+        public void serialize(StreamRequest srm, DataOutput dos, int version) throws IOException
+        {
+            dos.writeLong(srm.sessionId);
+            CompactEndpointSerializationHelper.serialize(srm.target, dos);
+            if (srm.file != null)
+            {
+                dos.writeBoolean(true);
+                PendingFile.serializer().serialize(srm.file, dos, version);
+            }
+            else
+            {
+                dos.writeBoolean(false);
+                dos.writeUTF(srm.table);
+                dos.writeInt(srm.ranges.size());
+                for (Range<Token> range : srm.ranges)
+                {
+                    AbstractBounds.serializer().serialize(range, dos, version);
+                }
+
+                if (version > MessagingService.VERSION_07)
+                    dos.writeUTF(srm.type.name());
+
+                if (version > MessagingService.VERSION_080)
+                {
+                    dos.writeInt(Iterables.size(srm.columnFamilies));
+                    for (ColumnFamilyStore cfs : srm.columnFamilies)
+                        dos.writeInt(cfs.metadata.cfId);
+                }
+            }
+        }
+
+        public StreamRequest deserialize(DataInput dis, int version) throws IOException
+        {
+            long sessionId = dis.readLong();
+            InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
+            boolean singleFile = dis.readBoolean();
+            if (singleFile)
+            {
+                PendingFile file = PendingFile.serializer().deserialize(dis, version);
+                return new StreamRequest(target, file, sessionId);
+            }
+            else
+            {
+                String table = dis.readUTF();
+                int size = dis.readInt();
+                List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
+                for( int i = 0; i < size; ++i )
+                {
+                    ranges.add((Range<Token>) AbstractBounds.serializer().deserialize(dis, version).toTokenBounds());
+                }
+                OperationType type = OperationType.RESTORE_REPLICA_COUNT;
+                if (version > MessagingService.VERSION_07)
+                    type = OperationType.valueOf(dis.readUTF());
+
+                List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
+                if (version > MessagingService.VERSION_080)
+                {
+                    int cfsSize = dis.readInt();
+                    for (int i = 0; i < cfsSize; ++i)
+                        stores.add(Table.open(table).getColumnFamilyStore(dis.readInt()));
+                }
+
+                return new StreamRequest(target, ranges, table, stores, sessionId, type);
+            }
+        }
+
+        public long serializedSize(StreamRequest streamRequestMessage, int version)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java b/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
deleted file mode 100644
index 3f54f05..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-
-/**
-* This class encapsulates the message that needs to be sent to nodes
-* that handoff data. The message contains information about ranges
-* that need to be transferred and the target node.
-*
-* If a file is specified, ranges and table will not. vice-versa should hold as well.
-*/
-public class StreamRequestMessage // TODO rename to StreamRequest
-{
-    private static final IVersionedSerializer<StreamRequestMessage> serializer;
-    static
-    {
-        serializer = new StreamRequestMessageSerializer();
-    }
-
-    public static IVersionedSerializer<StreamRequestMessage> serializer()
-    {
-        return serializer;
-    }
-
-    protected final long sessionId;
-    protected final InetAddress target;
-
-    // if this is specified, ranges and table should not be.
-    protected final PendingFile file;
-
-    // if these are specified, file shoud not be.
-    protected final Collection<Range<Token>> ranges;
-    protected final String table;
-    protected final Iterable<ColumnFamilyStore> columnFamilies;
-    protected final OperationType type;
-
-    StreamRequestMessage(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
-    {
-        this.target = target;
-        this.ranges = ranges;
-        this.table = table;
-        this.columnFamilies = columnFamilies;
-        this.sessionId = sessionId;
-        this.type = type;
-        file = null;
-    }
-
-    StreamRequestMessage(InetAddress target, PendingFile file, long sessionId)
-    {
-        this.target = target;
-        this.file = file;
-        this.sessionId = sessionId;
-        this.type = file.type;
-        ranges = null;
-        table = null;
-        columnFamilies = null;
-    }
-
-    public MessageOut<StreamRequestMessage> createMessage()
-    {
-        return new MessageOut<StreamRequestMessage>(MessagingService.Verb.STREAM_REQUEST, this, serializer);
-    }
-
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");
-        if (file == null)
-        {
-            sb.append(table);
-            sb.append("@");
-            sb.append(columnFamilies.toString());
-            sb.append("@");
-            sb.append(target);
-            sb.append("------->");
-            for ( Range<Token> range : ranges )
-            {
-                sb.append(range);
-                sb.append(" ");
-            }
-            sb.append(type);
-        }
-        else
-        {
-            sb.append(file.toString());
-        }
-        return sb.toString();
-    }
-
-    private static class StreamRequestMessageSerializer implements IVersionedSerializer<StreamRequestMessage>
-    {
-        public void serialize(StreamRequestMessage srm, DataOutput dos, int version) throws IOException
-        {
-            dos.writeLong(srm.sessionId);
-            CompactEndpointSerializationHelper.serialize(srm.target, dos);
-            if (srm.file != null)
-            {
-                dos.writeBoolean(true);
-                PendingFile.serializer().serialize(srm.file, dos, version);
-            }
-            else
-            {
-                dos.writeBoolean(false);
-                dos.writeUTF(srm.table);
-                dos.writeInt(srm.ranges.size());
-                for (Range<Token> range : srm.ranges)
-                {
-                    AbstractBounds.serializer().serialize(range, dos, version);
-                }
-
-                if (version > MessagingService.VERSION_07)
-                    dos.writeUTF(srm.type.name());
-
-                if (version > MessagingService.VERSION_080)
-                {
-                    dos.writeInt(Iterables.size(srm.columnFamilies));
-                    for (ColumnFamilyStore cfs : srm.columnFamilies)
-                        dos.writeInt(cfs.metadata.cfId);
-                }
-            }
-        }
-
-        public StreamRequestMessage deserialize(DataInput dis, int version) throws IOException
-        {
-            long sessionId = dis.readLong();
-            InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
-            boolean singleFile = dis.readBoolean();
-            if (singleFile)
-            {
-                PendingFile file = PendingFile.serializer().deserialize(dis, version);
-                return new StreamRequestMessage(target, file, sessionId);
-            }
-            else
-            {
-                String table = dis.readUTF();
-                int size = dis.readInt();
-                List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
-                for( int i = 0; i < size; ++i )
-                {
-                    ranges.add((Range<Token>) AbstractBounds.serializer().deserialize(dis, version).toTokenBounds());
-                }
-                OperationType type = OperationType.RESTORE_REPLICA_COUNT;
-                if (version > MessagingService.VERSION_07)
-                    type = OperationType.valueOf(dis.readUTF());
-
-                List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
-                if (version > MessagingService.VERSION_080)
-                {
-                    int cfsSize = dis.readInt();
-                    for (int i = 0; i < cfsSize; ++i)
-                        stores.add(Table.open(table).getColumnFamilyStore(dis.readInt()));
-                }
-
-                return new StreamRequestMessage(target, ranges, table, stores, sessionId, type);
-            }
-        }
-
-        public long serializedSize(StreamRequestMessage streamRequestMessage, int version)
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
index e5eed0b..023f523 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
@@ -17,14 +17,9 @@
  */
  package org.apache.cassandra.streaming;
 
- import java.io.DataInputStream;
- import java.io.IOError;
- import java.io.IOException;
-
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
  import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 
@@ -32,16 +27,16 @@ import org.apache.cassandra.net.MessageIn;
  * This verb handler handles the StreamRequestMessage that is sent by
  * the node requesting range transfer.
 */
-public class StreamRequestVerbHandler implements IVerbHandler<StreamRequestMessage>
+public class StreamRequestVerbHandler implements IVerbHandler<StreamRequest>
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
 
-    public void doVerb(MessageIn<StreamRequestMessage> message, String id)
+    public void doVerb(MessageIn<StreamRequest> message, String id)
     {
         if (logger.isDebugEnabled())
             logger.debug("Received a StreamRequestMessage from {}", message.from);
 
-        StreamRequestMessage srm = message.payload;
+        StreamRequest srm = message.payload;
         if (logger.isDebugEnabled())
             logger.debug(srm.toString());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 4598f71..7fd6aac 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -67,16 +67,16 @@ public class SerializationsTest extends AbstractSerializationsTester
         Map<InetAddress, EndpointState> states = new HashMap<InetAddress, EndpointState>();
         states.put(InetAddress.getByName("127.0.0.1"), Statics.EndpointSt);
         states.put(InetAddress.getByName("127.0.0.2"), Statics.EndpointSt);
-        GossipDigestAckMessage ack = new GossipDigestAckMessage(Statics.Digests, states);
-        GossipDigestAck2Message ack2 = new GossipDigestAck2Message(states);
-        GossipDigestSynMessage syn = new GossipDigestSynMessage("Not a real cluster name", Statics.Digests);
+        GossipDigestAck ack = new GossipDigestAck(Statics.Digests, states);
+        GossipDigestAck2 ack2 = new GossipDigestAck2(states);
+        GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", Statics.Digests);
 
         DataOutputStream out = getOutput("gms.Gossip.bin");
         for (GossipDigest gd : Statics.Digests)
             GossipDigest.serializer().serialize(gd, out, getVersion());
-        GossipDigestAckMessage.serializer().serialize(ack, out, getVersion());
-        GossipDigestAck2Message.serializer().serialize(ack2, out, getVersion());
-        GossipDigestSynMessage.serializer().serialize(syn, out, getVersion());
+        GossipDigestAck.serializer().serialize(ack, out, getVersion());
+        GossipDigestAck2.serializer().serialize(ack2, out, getVersion());
+        GossipDigestSyn.serializer().serialize(syn, out, getVersion());
         out.close();
     }
 
@@ -89,10 +89,10 @@ public class SerializationsTest extends AbstractSerializationsTester
         int count = 0;
         DataInputStream in = getInput("gms.Gossip.bin");
         while (count < Statics.Digests.size())
-            assert GossipDigestAck2Message.serializer().deserialize(in, getVersion()) != null;
-        assert GossipDigestAckMessage.serializer().deserialize(in, getVersion()) != null;
-        assert GossipDigestAck2Message.serializer().deserialize(in, getVersion()) != null;
-        assert GossipDigestSynMessage.serializer().deserialize(in, getVersion()) != null;
+            assert GossipDigestAck2.serializer().deserialize(in, getVersion()) != null;
+        assert GossipDigestAck.serializer().deserialize(in, getVersion()) != null;
+        assert GossipDigestAck2.serializer().deserialize(in, getVersion()) != null;
+        assert GossipDigestSyn.serializer().deserialize(in, getVersion()) != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index ebc181a..2f9f8d9 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -150,14 +150,14 @@ public class SerializationsTest extends AbstractSerializationsTester
         for (int i = 0; i < 5; i++)
             ranges.add(new Range<Token>(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
         List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
-        StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
-        StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
-        StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
+        StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
+        StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
+        StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
 
         DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
-        StreamRequestMessage.serializer().serialize(msg0, out, getVersion());
-        StreamRequestMessage.serializer().serialize(msg1, out, getVersion());
-        StreamRequestMessage.serializer().serialize(msg2, out, getVersion());
+        StreamRequest.serializer().serialize(msg0, out, getVersion());
+        StreamRequest.serializer().serialize(msg1, out, getVersion());
+        StreamRequest.serializer().serialize(msg2, out, getVersion());
         msg0.createMessage().serialize(out, getVersion());
         msg1.createMessage().serialize(out, getVersion());
         msg2.createMessage().serialize(out, getVersion());
@@ -171,9 +171,9 @@ public class SerializationsTest extends AbstractSerializationsTester
             testStreamRequestMessageWrite();
 
         DataInputStream in = getInput("streaming.StreamRequestMessage.bin");
-        assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
-        assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
-        assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
+        assert StreamRequest.serializer().deserialize(in, getVersion()) != null;
+        assert StreamRequest.serializer().deserialize(in, getVersion()) != null;
+        assert StreamRequest.serializer().deserialize(in, getVersion()) != null;
         assert MessageIn.read(in, getVersion(), "id") != null;
         assert MessageIn.read(in, getVersion(), "id") != null;
         assert MessageIn.read(in, getVersion(), "id") != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9471e8d0/test/unit/org/apache/cassandra/streaming/StreamUtil.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamUtil.java b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
index 0719eb4..4987923 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamUtil.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
@@ -19,14 +19,11 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
 
 public class StreamUtil
 {
@@ -34,7 +31,7 @@ public class StreamUtil
      * Takes an stream request message and creates an empty status response. Exists here because StreamRequestMessage
      * is package protected.
      */
-    static public void finishStreamRequest(MessageIn<StreamRequestMessage> msg, InetAddress to)
+    static public void finishStreamRequest(MessageIn<StreamRequest> msg, InetAddress to)
     {
         StreamInSession session = StreamInSession.get(to, msg.payload.sessionId);
         try


Mime
View raw message