cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [9/15] git commit: Rewrite IncomingTcpConnection to deserialize w/o extra copies to byte[]. MessageIn now has a payload field, and uses the Verb to look up the correct deserializer. REQUEST_RESPONSE deserializer is not uniquely determined by Verb, so we
Date Tue, 08 May 2012 17:56:15 GMT
Rewrite IncomingTcpConnection to deserialize w/o extra copies to byte[]. MessageIn now has a payload field, and uses the Verb to look up the correct deserializer. REQUEST_RESPONSE deserializer is not uniquely determined by Verb, so we look those up by their callback instead.

patch by jbellis and yukim for CASSANDRA-3617


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a06be23f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a06be23f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a06be23f

Branch: refs/heads/trunk
Commit: a06be23fbe7859063039767ce0dff64922445f39
Parents: 021ec71
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Mar 26 17:52:16 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue May 8 12:40:47 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/CounterMutationVerbHandler.java   |   10 +-
 .../cassandra/db/DefinitionsUpdateVerbHandler.java |   16 ++-
 src/java/org/apache/cassandra/db/DefsTable.java    |   12 --
 .../org/apache/cassandra/db/IndexScanCommand.java  |    9 +-
 .../cassandra/db/MigrationRequestVerbHandler.java  |    4 +-
 .../org/apache/cassandra/db/RangeSliceCommand.java |    9 +-
 .../apache/cassandra/db/ReadRepairVerbHandler.java |   16 +-
 .../org/apache/cassandra/db/ReadVerbHandler.java   |   13 +-
 .../cassandra/db/RowMutationVerbHandler.java       |   20 +-
 .../cassandra/db/SchemaCheckVerbHandler.java       |    2 +-
 .../org/apache/cassandra/db/SnapshotCommand.java   |    7 -
 .../apache/cassandra/db/TruncateVerbHandler.java   |   14 +-
 .../org/apache/cassandra/dht/BootStrapper.java     |   10 +-
 .../cassandra/gms/GossipDigestAck2Message.java     |    2 +-
 .../cassandra/gms/GossipDigestAck2VerbHandler.java |   20 +--
 .../cassandra/gms/GossipDigestAckMessage.java      |    4 +-
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |   60 +++----
 .../cassandra/gms/GossipDigestSynMessage.java      |    2 +-
 .../cassandra/gms/GossipDigestSynVerbHandler.java  |   70 +++----
 .../cassandra/gms/GossipShutdownVerbHandler.java   |    5 +-
 src/java/org/apache/cassandra/net/AsyncResult.java |   12 +-
 .../org/apache/cassandra/net/CallbackInfo.java     |   23 ++-
 src/java/org/apache/cassandra/net/Header.java      |  154 ---------------
 .../org/apache/cassandra/net/IAsyncCallback.java   |    4 +-
 .../org/apache/cassandra/net/IAsyncResult.java     |    6 +-
 .../org/apache/cassandra/net/IVerbHandler.java     |    4 +-
 .../cassandra/net/IncomingTcpConnection.java       |   41 ++---
 .../apache/cassandra/net/MessageDeliveryTask.java  |    2 +-
 src/java/org/apache/cassandra/net/MessageIn.java   |  102 ++++++----
 .../org/apache/cassandra/net/MessagingService.java |  121 ++++++++++--
 .../cassandra/net/OutboundTcpConnection.java       |    5 -
 .../apache/cassandra/net/ResponseVerbHandler.java  |    6 +-
 .../cassandra/service/AbstractRowResolver.java     |   44 ++---
 .../cassandra/service/AntiEntropyService.java      |   83 +++------
 .../cassandra/service/DatacenterReadCallback.java  |    5 +-
 .../DatacenterSyncWriteResponseHandler.java        |    2 +-
 .../service/DatacenterWriteResponseHandler.java    |    2 +-
 .../cassandra/service/IResponseResolver.java       |   11 +-
 .../cassandra/service/IndexScanVerbHandler.java    |   10 +-
 .../apache/cassandra/service/MigrationTask.java    |    9 +-
 .../service/RangeSliceResponseResolver.java        |   22 +--
 .../cassandra/service/RangeSliceVerbHandler.java   |   11 +-
 .../org/apache/cassandra/service/ReadCallback.java |   12 +-
 .../cassandra/service/RowDigestResolver.java       |   13 +-
 .../cassandra/service/RowRepairResolver.java       |   11 +-
 .../cassandra/service/SnapshotVerbHandler.java     |   10 +-
 .../org/apache/cassandra/service/StorageProxy.java |   23 +--
 .../apache/cassandra/streaming/FileStreamTask.java |   26 +--
 .../streaming/ReplicationFinishedVerbHandler.java  |    6 +-
 .../apache/cassandra/streaming/StreamReply.java    |    2 +-
 .../streaming/StreamReplyVerbHandler.java          |   17 +-
 .../cassandra/streaming/StreamRequestMessage.java  |    4 +-
 .../streaming/StreamRequestVerbHandler.java        |   29 +--
 .../cassandra/streaming/StreamingRepairTask.java   |   44 +----
 .../apache/cassandra/db/SerializationsTest.java    |   41 ++--
 .../apache/cassandra/net/MessageSerializer.java    |   52 -----
 .../org/apache/cassandra/service/RemoveTest.java   |    2 +-
 .../cassandra/service/SerializationsTest.java      |   12 +-
 .../cassandra/streaming/SerializationsTest.java    |   12 +-
 .../org/apache/cassandra/streaming/StreamUtil.java |   16 +-
 60 files changed, 527 insertions(+), 789 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d370a85..470de64 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -33,24 +33,22 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class CounterMutationVerbHandler implements IVerbHandler
+public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
 {
     private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<CounterMutation> message, String id)
     {
-        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
         try
         {
-            CounterMutation cm = CounterMutation.serializer().deserialize(in, message.getVersion());
+            CounterMutation cm = message.payload;
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 
             String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
             StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
             WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
-            MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
+            MessagingService.instance().sendReply(response.createMessage(), id, message.from);
         }
         catch (UnavailableException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index f017f97..5c85530 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.Collection;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,6 +26,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
@@ -32,19 +35,24 @@ import org.apache.cassandra.utils.WrappedRunnable;
  * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
  * (in form of row mutations) to all the alive nodes in the cluster.
  */
-public class DefinitionsUpdateVerbHandler implements IVerbHandler
+public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<RowMutation>>
 {
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
 
-    public void doVerb(final MessageIn message, String id)
+    public void doVerb(final MessageIn<Collection<RowMutation>> message, String id)
     {
-        logger.debug("Received schema mutation push from " + message.getFrom());
+        logger.debug("Received schema mutation push from " + message.from);
 
         StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
         {
             public void runMayThrow() throws Exception
             {
-                DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion());
+                if (message.version < MessagingService.VERSION_11)
+                {
+                    logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please upgrade first");
+                    return;
+                }
+                DefsTable.mergeSchema(message.payload);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 9d2c257..58ab4a6 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -238,18 +238,6 @@ public class DefsTable
      * @throws ConfigurationException If one of metadata attributes has invalid value
      * @throws IOException If data was corrupted during transportation or failed to apply fs operations
      */
-    public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException
-    {
-        if (version < MessagingService.VERSION_11)
-        {
-            logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first.");
-            return;
-        }
-
-        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data));
-        mergeSchema(MigrationManager.MigrationsSerializer.instance.deserialize(in, version));
-    }
-
     public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
     {
         // current state of the schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/IndexScanCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexScanCommand.java b/src/java/org/apache/cassandra/db/IndexScanCommand.java
index 2f14849..ad11464 100644
--- a/src/java/org/apache/cassandra/db/IndexScanCommand.java
+++ b/src/java/org/apache/cassandra/db/IndexScanCommand.java
@@ -37,7 +37,7 @@ import org.apache.thrift.TSerializer;
 
 public class IndexScanCommand
 {
-    private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
+    public static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
 
     public final String keyspace;
     public final String column_family;
@@ -60,13 +60,6 @@ public class IndexScanCommand
         return new MessageOut<IndexScanCommand>(MessagingService.Verb.INDEX_SCAN, this, serializer);
     }
 
-    public static IndexScanCommand read(MessageIn message) throws IOException
-    {
-        byte[] bytes = message.getMessageBody();
-        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
-        return serializer.deserialize(new DataInputStream(bis), message.getVersion());
-    }
-
     private static class IndexScanCommandSerializer implements IVersionedSerializer<IndexScanCommand>
     {
         public void serialize(IndexScanCommand o, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 41650fa..97fd641 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -38,10 +38,10 @@ public class MigrationRequestVerbHandler implements IVerbHandler
 
     public void doVerb(MessageIn message, String id)
     {
-        logger.debug("Received migration request from {}.", message.getFrom());
+        logger.debug("Received migration request from {}.", message.from);
         MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
                                                                                                SystemTable.serializeSchema(),
                                                                                                MigrationManager.MigrationsSerializer.instance);
-        MessagingService.instance().sendReply(response, id, message.getFrom());
+        MessagingService.instance().sendReply(response, id, message.from);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 528de5c..f029ae7 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -61,7 +61,7 @@ import org.apache.thrift.TSerializer;
 
 public class RangeSliceCommand implements IReadCommand
 {
-    private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
+    public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
 
     public final String keyspace;
 
@@ -134,13 +134,6 @@ public class RangeSliceCommand implements IReadCommand
                '}';
     }
 
-    public static RangeSliceCommand read(MessageIn message) throws IOException
-    {
-        byte[] bytes = message.getMessageBody();
-        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
-        return serializer.deserialize(new DataInputStream(bis), message.getVersion());
-    }
-
     public String getKeyspace()
     {
         return keyspace;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 5864961..bc5e820 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -17,33 +17,27 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
-public class ReadRepairVerbHandler implements IVerbHandler
+public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
 {
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<RowMutation> message, String id)
     {
-        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
-        RowMutation rm;
         try
         {
-            rm = RowMutation.serializer().deserialize(in, message.getVersion());
+            RowMutation rm = message.payload;
             rm.apply();
+            WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
+            MessagingService.instance().sendReply(response.createMessage(), id, message.from);
         }
         catch (IOException e)
         {
             throw new IOError(e);
         }
-
-        WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
-        MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 7335d51..d94c0f1 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -31,11 +31,11 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ReadVerbHandler implements IVerbHandler
+public class ReadVerbHandler implements IVerbHandler<ReadCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<ReadCommand> message, String id)
     {
         if (StorageService.instance.isBootstrapMode())
         {
@@ -44,8 +44,7 @@ public class ReadVerbHandler implements IVerbHandler
 
         try
         {
-            FastByteArrayInputStream in = new FastByteArrayInputStream(message.getMessageBody());
-            ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(in), message.getVersion());
+            ReadCommand command = message.payload;
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
 
@@ -53,9 +52,9 @@ public class ReadVerbHandler implements IVerbHandler
                                                                           getResponse(command, row),
                                                                           ReadResponse.serializer());
             if (logger.isDebugEnabled())
-              logger.debug(String.format("Read key %s; sending response to %s@%s",
-                                          ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
-            MessagingService.instance().sendReply(reply, id, message.getFrom());
+                logger.debug(String.format("Read key %s; sending response to %s@%s",
+                                            ByteBufferUtil.bytesToHex(command.key), id, message.from));
+            MessagingService.instance().sendReply(reply, id, message.from);
         }
         catch (IOException ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index 39e5ed3..fbf849a 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -27,30 +27,30 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
 
-public class RowMutationVerbHandler implements IVerbHandler
+public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
 {
     private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<RowMutation> message, String id)
     {
         try
         {
-            RowMutation rm = RowMutation.fromBytes(message.getMessageBody(), message.getVersion());
+            RowMutation rm = message.payload;
             if (logger.isDebugEnabled())
               logger.debug("Applying " + rm);
 
             // Check if there were any forwarding headers in this message
-            InetAddress replyTo = message.getFrom();
-            byte[] from = message.getHeader(RowMutation.FORWARD_FROM);
-            if (from != null)
+            InetAddress replyTo = message.from;
+            byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+            if (from == null)
             {
-                replyTo = InetAddress.getByAddress(from);
+                byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
+                if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
+                    forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
             }
             else
             {
-                byte[] forwardBytes = message.getHeader(RowMutation.FORWARD_TO);
-                if (forwardBytes != null && message.getVersion() >= MessagingService.VERSION_11)
-                    forwardToLocalNodes(rm, message.getVerb(), forwardBytes, message.getFrom());
+                replyTo = InetAddress.getByAddress(from);
             }
 
             rm.apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 0003d82..d33419e 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -34,6 +34,6 @@ public class SchemaCheckVerbHandler implements IVerbHandler
     {
         logger.debug("Received schema check request.");
         MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
-        MessagingService.instance().sendReply(response, id, message.getFrom());
+        MessagingService.instance().sendReply(response, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index 5463298..623d5c2 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -50,13 +50,6 @@ public class SnapshotCommand
         return new MessageOut<SnapshotCommand>(MessagingService.Verb.SNAPSHOT, this, serializer);
     }
 
-    public static SnapshotCommand read(MessageIn message) throws IOException
-    {
-        byte[] bytes = message.getMessageBody();
-        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
-        return serializer.deserialize(new DataInputStream(bis), message.getVersion());
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 82bf14f..f7a7546 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -29,17 +29,15 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
-public class TruncateVerbHandler implements IVerbHandler
+public class TruncateVerbHandler implements IVerbHandler<Truncation>
 {
     private static final Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<Truncation> message, String id)
     {
-        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
         try
         {
-            Truncation t = Truncation.serializer().deserialize(in, message.getVersion());
+            Truncation t = message.payload;
             logger.debug("Applying {}", t);
 
             try
@@ -55,8 +53,8 @@ public class TruncateVerbHandler implements IVerbHandler
             logger.debug("Truncate operation succeeded at this host");
 
             TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
-            logger.debug("{} applied.  Sending response to {}@{} ", new Object[]{ t, id, message.getFrom()});
-            MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
+            logger.debug("{} applied.  Sending response to {}@{} ", new Object[]{ t, id, message.from });
+            MessagingService.instance().sendReply(response.createMessage(), id, message.from);
         }
         catch (IOException e)
         {
@@ -67,6 +65,6 @@ public class TruncateVerbHandler implements IVerbHandler
     private static void respondError(Truncation t, MessageIn truncateRequestMessage) throws IOException
     {
         TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, false);
-        MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.getFrom());
+        MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index e31f908..d6fdf68 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -178,11 +178,11 @@ public class BootStrapper
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
             MessageOut<String> response = new MessageOut<String>(MessagingService.Verb.INTERNAL_RESPONSE, tokenString, StringSerializer.instance);
-            MessagingService.instance().sendReply(response, id, message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.from);
         }
     }
 
-    private static class BootstrapTokenCallback implements IAsyncCallback
+    private static class BootstrapTokenCallback implements IAsyncCallback<String>
     {
         private volatile Token<?> token;
         private final Condition condition = new SimpleCondition();
@@ -202,9 +202,9 @@ public class BootStrapper
             return success ? token : null;
         }
 
-        public void response(MessageIn msg)
+        public void response(MessageIn<String> msg)
         {
-            token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
+            token = StorageService.getPartitioner().getTokenFactory().fromString(msg.payload);
             condition.signalAll();
         }
 
@@ -214,7 +214,7 @@ public class BootStrapper
         }
     }
 
-    private static class StringSerializer implements IVersionedSerializer<String>
+    public static class StringSerializer implements IVersionedSerializer<String>
     {
         public static final StringSerializer instance = new StringSerializer();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
index c1d880e..609331f 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
  * last stage of the 3 way messaging of the Gossip protocol.
  */
 
-class GossipDigestAck2Message
+public class GossipDigestAck2Message
 {
     private static final IVersionedSerializer<GossipDigestAck2Message> serializer;
     static

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index 141429e..42080bd 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -29,30 +29,18 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 
-public class GossipDigestAck2VerbHandler implements IVerbHandler
+public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck2Message>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<GossipDigestAck2Message> message, String id)
     {
         if (logger.isTraceEnabled())
         {
-            InetAddress from = message.getFrom();
+            InetAddress from = message.from;
             logger.trace("Received a GossipDigestAck2Message from {}", from);
         }
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
-        GossipDigestAck2Message gDigestAck2Message;
-        try
-        {
-            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis, message.getVersion());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        Map<InetAddress, EndpointState> remoteEpStateMap = gDigestAck2Message.getEndpointStateMap();
+        Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap();
         /* Notify the Failure Detector */
         Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
         Gossiper.instance.applyStateLocally(remoteEpStateMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
index 4cc25e5..60706cb 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
  * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
  */
 
-class GossipDigestAckMessage // TODO rename
+public class GossipDigestAckMessage // TODO rename
 {
     private static final IVersionedSerializer<GossipDigestAckMessage> serializer;
     static
@@ -42,7 +42,7 @@ class GossipDigestAckMessage // TODO rename
     final List<GossipDigest> gDigestList;
     final Map<InetAddress, EndpointState> epStateMap;
 
-    static IVersionedSerializer<GossipDigestAckMessage> serializer()
+    public static IVersionedSerializer<GossipDigestAckMessage> serializer()
     {
         return serializer;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 5c3e06b..69fa5cf 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -33,13 +33,13 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-public class GossipDigestAckVerbHandler implements IVerbHandler
+public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAckMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<GossipDigestAckMessage> message, String id)
     {
-        InetAddress from = message.getFrom();
+        InetAddress from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestAckMessage from {}", from);
         if (!Gossiper.instance.isEnabled())
@@ -49,42 +49,32 @@ public class GossipDigestAckVerbHandler implements IVerbHandler
             return;
         }
 
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
+        GossipDigestAckMessage gDigestAckMessage = message.payload;
+        List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+        Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
 
-        try
+        if ( epStateMap.size() > 0 )
         {
-            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis, message.getVersion());
-            List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
-            Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
-
-            if ( epStateMap.size() > 0 )
-            {
-                /* Notify the Failure Detector */
-                Gossiper.instance.notifyFailureDetector(epStateMap);
-                Gossiper.instance.applyStateLocally(epStateMap);
-            }
-
-            /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
-            Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
-            for( GossipDigest gDigest : gDigestList )
-            {
-                InetAddress addr = gDigest.getEndpoint();
-                EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
-                if ( localEpStatePtr != null )
-                    deltaEpStateMap.put(addr, localEpStatePtr);
-            }
-
-            MessageOut<GossipDigestAck2Message> gDigestAck2Message = new MessageOut<GossipDigestAck2Message>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
-                                                                                                             new GossipDigestAck2Message(deltaEpStateMap), 
-                                                                                                             GossipDigestAck2Message.serializer());
-            if (logger.isTraceEnabled())
-                logger.trace("Sending a GossipDigestAck2Message to {}", from);
-            MessagingService.instance().sendOneWay(gDigestAck2Message, from);
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
         }
-        catch ( IOException e )
+
+        /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
+        for( GossipDigest gDigest : gDigestList )
         {
-            throw new RuntimeException(e);
+            InetAddress addr = gDigest.getEndpoint();
+            EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+            if ( localEpStatePtr != null )
+                deltaEpStateMap.put(addr, localEpStatePtr);
         }
+
+        MessageOut<GossipDigestAck2Message> gDigestAck2Message = new MessageOut<GossipDigestAck2Message>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
+                                                                                                         new GossipDigestAck2Message(deltaEpStateMap),
+                                                                                                         GossipDigestAck2Message.serializer());
+        if (logger.isTraceEnabled())
+            logger.trace("Sending a GossipDigestAck2Message to {}", from);
+        MessagingService.instance().sendOneWay(gDigestAck2Message, from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
index e798edb..05e210f 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.net.CompactEndpointSerializationHelper;
  * round.
  */
 
-class GossipDigestSynMessage
+public class GossipDigestSynMessage
 {
     private static final IVersionedSerializer<GossipDigestSynMessage> serializer;
     static

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 14bb324..70df9d3 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -32,13 +32,13 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-public class GossipDigestSynVerbHandler implements IVerbHandler
+public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSynMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<GossipDigestSynMessage> message, String id)
     {
-        InetAddress from = message.getFrom();
+        InetAddress from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestSynMessage from {}", from);
         if (!Gossiper.instance.isEnabled())
@@ -48,50 +48,40 @@ public class GossipDigestSynVerbHandler implements IVerbHandler
             return;
         }
 
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
-
-        try
+        GossipDigestSynMessage gDigestMessage = message.payload;
+        /* If the message is from a different cluster throw it away. */
+        if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
         {
-            GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis, message.getVersion());
-            /* If the message is from a different cluster throw it away. */
-            if ( !gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()) )
-            {
-                logger.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId  + "!=" + DatabaseDescriptor.getClusterName());
-                return;
-            }
+            logger.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId  + "!=" + DatabaseDescriptor.getClusterName());
+            return;
+        }
 
-            List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
-            if (logger.isTraceEnabled())
+        List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+        if (logger.isTraceEnabled())
+        {
+            StringBuilder sb = new StringBuilder();
+            for ( GossipDigest gDigest : gDigestList )
             {
-                StringBuilder sb = new StringBuilder();
-                for ( GossipDigest gDigest : gDigestList )
-                {
-                    sb.append(gDigest);
-                    sb.append(" ");
-                }
-                logger.trace("Gossip syn digests are : " + sb.toString());
+                sb.append(gDigest);
+                sb.append(" ");
             }
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(gDigestList);
+            logger.trace("Gossip syn digests are : " + sb.toString());
+        }
+        /* Notify the Failure Detector */
+        Gossiper.instance.notifyFailureDetector(gDigestList);
 
-            doSort(gDigestList);
+        doSort(gDigestList);
 
-            List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
-            Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
-            Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+        List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
+        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
 
-            MessageOut<GossipDigestAckMessage> gDigestAckMessage = new MessageOut<GossipDigestAckMessage>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
-                                                                                                          new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap),
-                                                                                                          GossipDigestAckMessage.serializer());
-            if (logger.isTraceEnabled())
-                logger.trace("Sending a GossipDigestAckMessage to {}", from);
-            MessagingService.instance().sendOneWay(gDigestAckMessage, from);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
+        MessageOut<GossipDigestAckMessage> gDigestAckMessage = new MessageOut<GossipDigestAckMessage>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
+                                                                                                      new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap),
+                                                                                                      GossipDigestAckMessage.serializer());
+        if (logger.isTraceEnabled())
+            logger.trace("Sending a GossipDigestAckMessage to {}", from);
+        MessagingService.instance().sendOneWay(gDigestAckMessage, from);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
index b29013d..8990739 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -31,13 +31,12 @@ public class GossipShutdownVerbHandler implements IVerbHandler
 
     public void doVerb(MessageIn message, String id)
     {
-        InetAddress from = message.getFrom();
         if (!Gossiper.instance.isEnabled())
         {
-            logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
+            logger.debug("Ignoring shutdown message from {} because gossip is disabled", message.from);
             return;
         }
-        FailureDetector.instance.forceConviction(from);
+        FailureDetector.instance.forceConviction(message.from);
     }
     
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/AsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/AsyncResult.java b/src/java/org/apache/cassandra/net/AsyncResult.java
index 9a3985b..83023e1 100644
--- a/src/java/org/apache/cassandra/net/AsyncResult.java
+++ b/src/java/org/apache/cassandra/net/AsyncResult.java
@@ -28,11 +28,11 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class AsyncResult implements IAsyncResult
+class AsyncResult<T> implements IAsyncResult<T>
 {
     private static final Logger logger = LoggerFactory.getLogger(AsyncResult.class);
 
-    private byte[] result;
+    private T result;
     private final AtomicBoolean done = new AtomicBoolean(false);
     private final Lock lock = new ReentrantLock();
     private final Condition condition;
@@ -45,7 +45,7 @@ class AsyncResult implements IAsyncResult
         startTime = System.currentTimeMillis();
     }
 
-    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
+    public T get(long timeout, TimeUnit tu) throws TimeoutException
     {
         lock.lock();
         try
@@ -77,15 +77,15 @@ class AsyncResult implements IAsyncResult
         return result;
     }
 
-    public void result(MessageIn response)
+    public void result(MessageIn<T> response)
     {
         try
         {
             lock.lock();
             if (!done.get())
             {
-                from = response.getFrom();
-                result = response.getMessageBody();
+                from = response.from;
+                result = response.payload;
                 done.set(true);
                 condition.signal();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index d872397..1def33a 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,35 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.net;
 
 import java.net.InetAddress;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
 
 /**
  * Encapsulates the callback information.
- * The ability to set the message is useful in cases for when a hint needs
+ * The ability to set the message is useful in cases for when a hint needs 
  * to be written due to a timeout in the response from a replica.
  */
-class CallbackInfo
+public class CallbackInfo
 {
     protected final InetAddress target;
     protected final IMessageCallback callback;
-    protected final MessageOut<?> message;
+    protected final MessageOut<?> sentMessage;
+    protected final IVersionedSerializer<?> serializer;
 
-    public CallbackInfo(InetAddress target, IMessageCallback callback)
+    public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
     {
         this.target = target;
         this.callback = callback;
-        this.message = null;
+        this.serializer = serializer;
+        this.sentMessage = null;
     }
 
-    public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> message)
+    public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
     {
         this.target = target;
         this.callback = callback;
-        this.message = message;
+        this.sentMessage = sentMessage;
+        this.serializer = serializer;
     }
 
     /**
@@ -54,6 +59,6 @@ class CallbackInfo
      */
     public boolean shouldHint()
     {
-        return message != null && StorageProxy.shouldHint(target);
+        return sentMessage != null && StorageProxy.shouldHint(target);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/Header.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/Header.java b/src/java/org/apache/cassandra/net/Header.java
deleted file mode 100644
index 2b4c2b2..0000000
--- a/src/java/org/apache/cassandra/net/Header.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.Map;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.utils.FBUtilities;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class Header
-{
-    private static final IVersionedSerializer<Header> serializer;
-
-    static
-    {
-        serializer = new HeaderSerializer();
-    }
-
-    public static IVersionedSerializer<Header> serializer()
-    {
-        return serializer;
-    }
-
-    // "from" is the ultimate origin of this request (the coordinator), which in a multi-DC setup
-    // is not necessarily the same as the node that forwards us the request (see StorageProxy.sendMessages
-    // and RowMutationVerbHandler.forwardToLocalNodes)
-    private final InetAddress from;
-    private final MessagingService.Verb verb;
-    protected final Map<String, byte[]> details;
-
-    Header(InetAddress from, MessagingService.Verb verb)
-    {
-        this(from, verb, Collections.<String, byte[]>emptyMap());
-    }
-
-    Header(InetAddress from, MessagingService.Verb verb, Map<String, byte[]> details)
-    {
-        assert from != null;
-        assert verb != null;
-
-        this.from = from;
-        this.verb = verb;
-        this.details = ImmutableMap.copyOf(details);
-    }
-
-    InetAddress getFrom()
-    {
-        return from;
-    }
-
-    MessagingService.Verb getVerb()
-    {
-        return verb;
-    }
-
-    byte[] getDetail(String key)
-    {
-        return details.get(key);
-    }
-
-    Header withDetailsAdded(String key, byte[] value)
-    {
-        Map<String, byte[]> detailsCopy = Maps.newHashMap(details);
-        detailsCopy.put(key, value);
-        return new Header(from, verb, detailsCopy);
-    }
-
-    Header withDetailsRemoved(String key)
-    {
-        if (!details.containsKey(key))
-            return this;
-        Map<String, byte[]> detailsCopy = Maps.newHashMap(details);
-        detailsCopy.remove(key);
-        return new Header(from, verb, detailsCopy);
-    }
-
-    public int serializedSize()
-    {
-        int size = 0;
-        size += CompactEndpointSerializationHelper.serializedSize(getFrom());
-        size += 4;
-        size += 4;
-        for (String key : details.keySet())
-        {
-            size += 2 + FBUtilities.encodedUTF8Length(key);
-            byte[] value = details.get(key);
-            size += 4 + value.length;
-        }
-        return size;
-    }
-}
-
-class HeaderSerializer implements IVersionedSerializer<Header>
-{
-    public void serialize(Header t, DataOutput dos, int version) throws IOException
-    {
-        CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
-        dos.writeInt(t.getVerb().ordinal());
-        dos.writeInt(t.details.size());
-        for (String key : t.details.keySet())
-        {
-            dos.writeUTF(key);
-            byte[] value = t.details.get(key);
-            dos.writeInt(value.length);
-            dos.write(value);
-        }
-    }
-
-    public Header deserialize(DataInput dis, int version) throws IOException
-    {
-        InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
-        int verbOrdinal = dis.readInt();
-        int size = dis.readInt();
-        Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
-        for ( int i = 0; i < size; ++i )
-        {
-            String key = dis.readUTF();
-            int length = dis.readInt();
-            byte[] bytes = new byte[length];
-            dis.readFully(bytes);
-            details.put(key, bytes);
-        }
-        return new Header(from, MessagingService.VERBS[verbOrdinal], details);
-    }
-
-    public long serializedSize(Header header, int version)
-    {
-        throw new UnsupportedOperationException();
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 797779b..868d368 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -23,10 +23,10 @@ package org.apache.cassandra.net;
  * service.  In particular, if any shared state is referenced, making
  * response alone synchronized will not suffice.
  */
-public interface IAsyncCallback extends IMessageCallback
+public interface IAsyncCallback<T> extends IMessageCallback
 {
     /**
      * @param msg response received.
      */
-    public void response(MessageIn msg);
+    public void response(MessageIn<T> msg);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IAsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncResult.java b/src/java/org/apache/cassandra/net/IAsyncResult.java
index 545cfb3..87a4c73 100644
--- a/src/java/org/apache/cassandra/net/IAsyncResult.java
+++ b/src/java/org/apache/cassandra/net/IAsyncResult.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public interface IAsyncResult extends IMessageCallback
+public interface IAsyncResult<T> extends IMessageCallback
 {
     /**
      * Same operation as the above get() but allows the calling
@@ -30,13 +30,13 @@ public interface IAsyncResult extends IMessageCallback
      * @param tu the time unit of the timeout argument
      * @return the result wrapped in an Object[]
     */
-    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
+    public T get(long timeout, TimeUnit tu) throws TimeoutException;
 
     /**
      * Store the result obtained for the submitted task.
      * @param result the response message
      */
-    public void result(MessageIn result);
+    public void result(MessageIn<T> result);
 
     public InetAddress getFrom();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index 96867ec..8ae63e2 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -23,7 +23,7 @@ package org.apache.cassandra.net;
  * for a given verb.
  */
 
-public interface IVerbHandler
+public interface IVerbHandler<T>
 {
     /**
      * This method delivers a message to the implementing class (if the implementing
@@ -34,5 +34,5 @@ public interface IVerbHandler
      * @param message - incoming message that needs handling.
      * @param id
      */
-    public void doVerb(MessageIn message, String id);
+    public void doVerb(MessageIn<T> message, String id);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 9647e8f..45d8255 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -17,15 +17,18 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
 
@@ -33,8 +36,6 @@ public class IncomingTcpConnection extends Thread
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
 
-    private static final int CHUNK_SIZE = 1024 * 1024;
-
     private final Socket socket;
     public InetAddress from;
 
@@ -126,35 +127,25 @@ public class IncomingTcpConnection extends Thread
 
     private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
     {
-        int totalSize = input.readInt();
-        String id = input.readUTF();
-        Header header = Header.serializer().deserialize(input, version);
-
-        int bodySize = input.readInt();
-        byte[] body = new byte[bodySize];
-        // readFully allocates a direct buffer the size of the chunk it is asked to read,
-        // so we cap that at CHUNK_SIZE.  See https://issues.apache.org/jira/browse/CASSANDRA-2654
-        int remainder = bodySize % CHUNK_SIZE;
-        for (int offset = 0; offset < bodySize - remainder; offset += CHUNK_SIZE)
-            input.readFully(body, offset, CHUNK_SIZE);
-        input.readFully(body, bodySize - remainder, remainder);
-        // earlier versions would send unnecessary bytes left over at the end of a buffer, too
-        long remaining = totalSize - OutboundTcpConnection.messageLength(header, id, body);
-        while (remaining > 0)
-            remaining -= input.skip(remaining);
+        if (version <= MessagingService.VERSION_11)
+            input.readInt(); // size of entire message. in 1.0+ this is just a placeholder
 
-        // for non-streaming connections, continue to read the messages (and ignore them) until sender
-        // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+        String id = input.readUTF();
+        MessageIn message = MessageIn.read(input, version, id);
+        if (message == null)
+        {
+            // callback expired; nothing to do
+            return null;
+        }
         if (version <= MessagingService.current_version)
         {
-            MessageIn message = new MessageIn(header, body, version);
             MessagingService.instance().receive(message, id);
         }
         else
         {
             logger.debug("Received connection from newer protocol version {}. Ignoring message", version);
         }
-        return header.getFrom();
+        return message.from;
     }
 
     private void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index b0603ff..a5fd614 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -39,7 +39,7 @@ public class MessageDeliveryTask implements Runnable
 
     public void run()
     {
-        MessagingService.Verb verb = message.getVerb();
+        MessagingService.Verb verb = message.verb;
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
             && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index f326802d..fe053a4 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -17,71 +17,95 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.io.IVersionedSerializer;
 
-public class MessageIn
+public class MessageIn<T>
 {
-    final Header header;
-    private final byte[] body;
-    private final transient int version;
+    public final InetAddress from;
+    public final T payload;
+    public final Map<String, byte[]> parameters;
+    public final MessagingService.Verb verb;
+    public final int version;
 
-    public MessageIn(Header header, byte[] body, int version)
+    private MessageIn(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
     {
-        assert header != null;
-        assert body != null;
-
-        this.header = header;
-        this.body = body;
+        this.from = from;
+        this.payload = payload;
+        this.parameters = parameters;
+        this.verb = verb;
         this.version = version;
     }
 
-    public MessageIn(InetAddress from, MessagingService.Verb verb, byte[] body, int version)
-    {
-        this(new Header(from, verb), body, version);
-    }
-
-    public byte[] getHeader(String key)
+    public static <T> MessageIn<T> create(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
     {
-        return header.getDetail(key);
+        return new MessageIn<T>(from, payload, parameters, verb, version);
     }
 
-    public byte[] getMessageBody()
+    public static <T2> MessageIn<T2> read(DataInput in, int version, String id) throws IOException
     {
-        return body;
-    }
+        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 
-    public int getVersion()
-    {
-        return version;
-    }
+        MessagingService.Verb verb = MessagingService.Verb.values()[in.readInt()];
+        int parameterCount = in.readInt();
+        Map<String, byte[]> parameters;
+        if (parameterCount == 0)
+        {
+            parameters = Collections.emptyMap();
+        }
+        else
+        {
+            ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
+            for (int i = 0; i < parameterCount; i++)
+            {
+                String key = in.readUTF();
+                byte[] value = new byte[in.readInt()];
+                in.readFully(value);
+                builder.put(key, value);
+            }
+            parameters = builder.build();
+        }
 
-    public InetAddress getFrom()
-    {
-        return header.getFrom();
+        int payloadSize = in.readInt();
+        if (payloadSize == 0)
+            return create(from, null, parameters, verb, version);
+        IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb);
+        if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+        {
+            CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+            if (callback == null)
+            {
+                // reply for expired callback.  we'll have to skip it.
+                in.skipBytes(payloadSize);
+                return null;
+            }
+            serializer = (IVersionedSerializer<T2>) callback.serializer;
+        }
+        T2 payload = serializer.deserialize(in, version);
+        return MessageIn.create(from, payload, parameters, verb, version);
     }
 
     public Stage getMessageType()
     {
-        return MessagingService.verbStages.get(getVerb());
-    }
-
-    public MessagingService.Verb getVerb()
-    {
-        return header.getVerb();
+        return MessagingService.verbStages.get(verb);
     }
 
     public String toString()
     {
         StringBuilder sbuf = new StringBuilder("");
         String separator = System.getProperty("line.separator");
-        sbuf.append("FROM:" + getFrom())
-            .append(separator)
-            .append("TYPE:" + getMessageType())
-            .append(separator)
-            .append("VERB:" + getVerb())
-            .append(separator);
+        sbuf.append("FROM:").append(from)
+            .append(separator).append("TYPE:").append(getMessageType())
+            .append(separator).append("VERB:").append(verb)
+        	.append(separator);
         return sbuf.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 649f81d..ef85a82 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -44,16 +46,22 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.gms.GossipDigestAck2Message;
+import org.apache.cassandra.gms.GossipDigestAckMessage;
+import org.apache.cassandra.gms.GossipDigestSynMessage;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.*;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -150,9 +158,88 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
     }};
 
+    /**
+     * Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it is.
+     * Most of the time, this is enough to determine how to deserialize the message payload.
+     * The exception is the REQUEST_RESPONSE verb, which just means "a reply to something you told me to do."
+     * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and
+     * handled the deserialization itself.  Now that we do that in ITC, to avoid the extra copy to an
+     * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to the CallbackInfo object
+     * (see below).
+     */
+    public static final EnumMap<Verb, IVersionedSerializer<?>> verbSerializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
+    {{
+        put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
+        put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
+
+        put(Verb.MUTATION, RowMutation.serializer());
+        put(Verb.READ_REPAIR, RowMutation.serializer());
+        put(Verb.READ, ReadCommand.serializer());
+        put(Verb.STREAM_REPLY, StreamReply.serializer);
+        put(Verb.STREAM_REQUEST, StreamRequestMessage.serializer());
+        put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+        put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
+        put(Verb.TREE_REQUEST, AntiEntropyService.TreeRequest.serializer);
+        put(Verb.TREE_RESPONSE, AntiEntropyService.Validator.serializer);
+        put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
+        put(Verb.STREAMING_REPAIR_RESPONSE, UUIDGen.serializer);
+        put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAckMessage.serializer());
+        put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2Message.serializer());
+        put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSynMessage.serializer());
+        put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance);
+        put(Verb.TRUNCATE, Truncation.serializer());
+        put(Verb.SCHEMA_CHECK, null);
+        put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
+        put(Verb.REPLICATION_FINISHED, null);
+        put(Verb.COUNTER_MUTATION, CounterMutation.serializer());
+    }};
+
+    /**
+     * A Map of what kind of serializer to wire up to a REQUEST_RESPONSE callback, based on outbound Verb.
+     */
+    public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
+    {{
+        put(Verb.MUTATION, WriteResponse.serializer());
+        put(Verb.READ_REPAIR, WriteResponse.serializer());
+        put(Verb.COUNTER_MUTATION, WriteResponse.serializer());
+        put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+        put(Verb.READ, ReadResponse.serializer());
+        put(Verb.TRUNCATE, TruncateResponse.serializer());
+        put(Verb.SNAPSHOT, null);
+
+        put(Verb.MIGRATION_REQUEST, MigrationManager.MigrationsSerializer.instance);
+        put(Verb.SCHEMA_CHECK, UUIDGen.serializer);
+        put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
+        put(Verb.REPLICATION_FINISHED, null);
+    }};
+
     /* This records all the results mapped by message Id */
     private final ExpiringMap<String, CallbackInfo> callbacks;
 
+    /**
+     * a placeholder class that means "deserialize using the callback." We can't implement this without
+     * special-case code in InboundTcpConnection because there is no way to pass the message id to IVersionedSerializer.
+     */
+    static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
+    {
+        public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
+
+        public Object deserialize(DataInput in, int version) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void serialize(Object o, DataOutput out, int version) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long serializedSize(Object o, int version)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
@@ -255,8 +342,8 @@ public final class MessagingService implements MessagingServiceMBean
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    assert expiredCallbackInfo.message != null;
-                    RowMutation rm = (RowMutation) expiredCallbackInfo.message.payload;
+                    assert expiredCallbackInfo.sentMessage != null;
+                    RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
                     return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
                 }
 
@@ -414,9 +501,9 @@ public final class MessagingService implements MessagingServiceMBean
 
         // If HH is enabled and this is a mutation message => store the message to track for potential hints.
         if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
-            previous = callbacks.put(messageId, new CallbackInfo(to, cb, message), timeout);
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
         else
-            previous = callbacks.put(messageId, new CallbackInfo(to, cb), timeout);
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
 
         assert previous == null;
         return messageId;
@@ -495,9 +582,9 @@ public final class MessagingService implements MessagingServiceMBean
         connection.enqueue(processedMessage, id);
     }
 
-    public IAsyncResult sendRR(MessageOut message, InetAddress to)
+    public <T> IAsyncResult<T> sendRR(MessageOut message, InetAddress to)
     {
-        IAsyncResult iar = new AsyncResult();
+        IAsyncResult<T> iar = new AsyncResult();
         sendRR(message, to, iar);
         return iar;
     }
@@ -594,8 +681,8 @@ public final class MessagingService implements MessagingServiceMBean
     public void receive(MessageIn message, String id)
     {
         if (logger.isTraceEnabled())
-            logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.getVerb()
-                          + " from " + id + "@" + message.getFrom());
+            logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb
+                          + " from " + id + "@" + message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
@@ -603,10 +690,20 @@ public final class MessagingService implements MessagingServiceMBean
 
         Runnable runnable = new MessageDeliveryTask(message, id);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
-        assert stage != null : "No stage for message type " + message.getVerb();
+        assert stage != null : "No stage for message type " + message.verb;
         stage.execute(runnable);
     }
 
+    public void setCallbackForTests(String messageId, CallbackInfo callback)
+    {
+        callbacks.put(messageId, callback);
+    }
+
+    public CallbackInfo getRegisteredCallback(String messageId)
+    {
+        return callbacks.get(messageId);
+    }
+
     public CallbackInfo removeRegisteredCallback(String messageId)
     {
         return callbacks.remove(messageId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 72164d7..a5cf79d 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -196,11 +196,6 @@ public class OutboundTcpConnection extends Thread
         message.serialize(out, version);
     }
 
-    public static int messageLength(Header header, String id, byte[] bytes)
-    {
-        return 2 + FBUtilities.encodedUTF8Length(id) + header.serializedSize() + 4 + bytes.length;
-    }
-
     private void disconnect()
     {
         if (socket != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3016daf..106f76d 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -35,18 +35,18 @@ public class ResponseVerbHandler implements IVerbHandler
         }
 
         IMessageCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.getFrom(), age);
+        MessagingService.instance().maybeAddLatency(cb, message.from, age);
 
         if (cb instanceof IAsyncCallback)
         {
             if (logger.isDebugEnabled())
-                logger.debug("Processing response on a callback from " + id + "@" + message.getFrom());
+                logger.debug("Processing response on a callback from " + id + "@" + message.from);
             ((IAsyncCallback) cb).response(message);
         }
         else
         {
             if (logger.isDebugEnabled())
-                logger.debug("Processing response on an async result from " + id + "@" + message.getFrom());
+                logger.debug("Processing response on an async result from " + id + "@" + message.from);
             ((IAsyncResult) cb).result(message);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index d4a3b65..b1647a2 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -17,33 +17,27 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collections;
+import java.util.Set;
 
-import org.apache.commons.lang.ArrayUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-public abstract class AbstractRowResolver implements IResponseResolver<Row>
+public abstract class AbstractRowResolver implements IResponseResolver<ReadResponse, Row>
 {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
 
-    private static final MessageIn FAKE_MESSAGE = new MessageIn(FBUtilities.getBroadcastAddress(), MessagingService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
-
     protected final String table;
-    protected final ConcurrentMap<MessageIn, ReadResponse> replies = new NonBlockingHashMap<MessageIn, ReadResponse>();
+    protected final Set<MessageIn<ReadResponse>> replies = new NonBlockingHashSet<MessageIn<ReadResponse>>();
     protected final DecoratedKey key;
 
     public AbstractRowResolver(ByteBuffer key, String table)
@@ -52,33 +46,25 @@ public abstract class AbstractRowResolver implements IResponseResolver<Row>
         this.table = table;
     }
 
-    public void preprocess(MessageIn message)
+    public void preprocess(MessageIn<ReadResponse> message)
     {
-        byte[] body = message.getMessageBody();
-        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
-        try
-        {
-            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
-            if (logger.isDebugEnabled())
-                logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
-            replies.put(message, result);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        replies.add(message);
     }
 
     /** hack so local reads don't force de/serialization of an extra real Message */
     public void injectPreProcessed(ReadResponse result)
     {
-        assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
-        replies.put(FAKE_MESSAGE, result);
+        MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                                           result,
+                                                           Collections.<String, byte[]>emptyMap(),
+                                                           MessagingService.Verb.INTERNAL_RESPONSE,
+                                                           MessagingService.current_version);
+        replies.add(message);
     }
 
-    public Iterable<MessageIn> getMessages()
+    public Iterable<MessageIn<ReadResponse>> getMessages()
     {
-        return replies.keySet();
+        return replies;
     }
 
     public int getMaxLiveColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index f9728ad..a13270a 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -221,19 +221,6 @@ public class AntiEntropyService
     }
 
     /**
-<<<<<<< HEAD
-=======
-     * Requests a tree from the given node, and returns the request that was sent.
-     */
-    TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname)
-    {
-        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
-        MessagingService.instance().sendOneWay(request.createMessage(), remote);
-        return request;
-    }
-
-    /**
->>>>>>> ddbe4e6... MessageOut
      * Responds to the node that requested the given valid tree.
      * @param validator A locally generated validator
      * @param local localhost (parameterized for testing)
@@ -454,31 +441,21 @@ public class AntiEntropyService
      * Handler for requests from remote nodes to generate a valid tree.
      * The payload is a CFPair representing the columnfamily to validate.
      */
-    public static class TreeRequestVerbHandler implements IVerbHandler
+    public static class TreeRequestVerbHandler implements IVerbHandler<TreeRequest>
     {
         /**
          * Trigger a validation compaction which will return the tree upon completion.
          */
-        public void doVerb(MessageIn message, String id)
+        public void doVerb(MessageIn<TreeRequest> message, String id)
         {
-            byte[] bytes = message.getMessageBody();
-
-            DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
-            try
-            {
-                TreeRequest remotereq = TreeRequest.serializer.deserialize(buffer, message.getVersion());
-                TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.range, remotereq.cf);
-
-                // trigger readonly-compaction
-                ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
-                Validator validator = new Validator(request);
-                logger.debug("Queueing validation compaction for " + request);
-                CompactionManager.instance.submitValidation(store, validator);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
+            TreeRequest remotereq = message.payload;
+            TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.cf);
+
+            // trigger read-only compaction
+            ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
+            Validator validator = new Validator(request);
+            logger.debug("Queueing validation compaction for " + request);
+            CompactionManager.instance.submitValidation(store, validator);
         }
     }
 
@@ -486,24 +463,14 @@ public class AntiEntropyService
      * Handler for responses from remote nodes which contain a valid tree.
      * The payload is a completed Validator object from the remote endpoint.
      */
-    public static class TreeResponseVerbHandler implements IVerbHandler
+    public static class TreeResponseVerbHandler implements IVerbHandler<Validator>
     {
-        public void doVerb(MessageIn message, String id)
+        public void doVerb(MessageIn<Validator> message, String id)
         {
-            byte[] bytes = message.getMessageBody();
-            DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
-
-            try
-            {
-                // deserialize the remote tree, and register it
-                Validator response = Validator.serializer.deserialize(buffer, message.getVersion());
-                TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.range, response.request.cf);
-                AntiEntropyService.instance.rendezvous(request, response.tree);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
+            // deserialize the remote tree, and register it
+            Validator response = message.payload;
+            TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.cf);
+            AntiEntropyService.instance.rendezvous(request, response.tree);
         }
     }
 
@@ -883,15 +850,15 @@ public class AntiEntropyService
                     snapshotLatch = new CountDownLatch(endpoints.size());
                     IAsyncCallback callback = new IAsyncCallback()
                     {
-                            public boolean isLatencyForSnitch()
-                            {
-                                return false;
-                            }
-
-                            public void response(MessageIn msg)
-                            {
-                                RepairJob.this.snapshotLatch.countDown();
-                            }
+                        public boolean isLatencyForSnitch()
+                        {
+                            return false;
+                        }
+
+                        public void response(MessageIn msg)
+                        {
+                            RepairJob.this.snapshotLatch.countDown();
+                        }
                     };
                     for (InetAddress endpoint : endpoints)
                         MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false).createMessage(), endpoint, callback);


Mime
View raw message