cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [10/15] Rewrite IncomingTcpConnection to deserialize w/o extra copies to byte[]. MessageIn now has a payload field, and uses the Verb to look up the correct deserializer. REQUEST_RESPONSE deserializer is not uniquely determined by Verb, so we look those
Date Tue, 08 May 2012 17:56:15 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index 36ed9b3..8005073 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -28,11 +28,12 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.thrift.protocol.TMessage;
 
 /**
  * Datacenter Quorum response handler blocks for a quorum of responses from the local DC
  */
-public class DatacenterReadCallback<T> extends ReadCallback<T>
+public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TMessage, TResolved>
 {
     private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
     {
@@ -62,7 +63,7 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
     @Override
     protected boolean waitingFor(MessageIn message)
     {
-        return localdc.equals(snitch.getDatacenter(message.getFrom()));
+        return localdc.equals(snitch.getDatacenter(message.from));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 149d05b..8996324 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -73,7 +73,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
     {
         String dataCenter = message == null
                             ? localdc
-                            : snitch.getDatacenter(message.getFrom());
+                            : snitch.getDatacenter(message.from);
 
         responses.get(dataCenter).getAndDecrement();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index a61a203..7592ad2 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -65,7 +65,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
     @Override
     public void response(MessageIn message)
     {
-        if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
+        if (message == null || localdc.equals(snitch.getDatacenter(message.from)))
         {
             if (responses.decrementAndGet() == 0)
                 condition.signal();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index fe0593b..1c7f6e1 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -19,9 +19,10 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 
+import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.net.MessageIn;
 
-public interface IResponseResolver<T> {
+public interface IResponseResolver<TMessage, TResolved> {
 
     /**
      * This Method resolves the responses that are passed in . for example : if
@@ -31,17 +32,17 @@ public interface IResponseResolver<T> {
      * repairs . Hence you need to derive a response resolver based on your
      * needs from this interface.
      */
-    public T resolve() throws DigestMismatchException, IOException;
+    public TResolved resolve() throws DigestMismatchException, IOException;
 
     public boolean isDataPresent();
 
     /**
      * returns the data response without comparing with any digests
      */
-    public T getData() throws IOException;
+    public TResolved getData() throws IOException;
 
-    public void preprocess(MessageIn message);
-    public Iterable<MessageIn> getMessages();
+    public void preprocess(MessageIn<TMessage> message);
+    public Iterable<MessageIn<TMessage>> getMessages();
 
     public int getMaxLiveColumns();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index dccb294..1e4d3f3 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -29,15 +29,15 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
 @Deprecated // 1.1 implements index scan with RangeSliceVerb instead
-public class IndexScanVerbHandler implements IVerbHandler
+public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<IndexScanCommand> message, String id)
     {
         try
         {
-            IndexScanCommand command = IndexScanCommand.read(message);
+            IndexScanCommand command = message.payload;
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
             List<Row> rows = cfs.search(command.index_clause.expressions,
                                         command.range,
@@ -45,8 +45,8 @@ public class IndexScanVerbHandler implements IVerbHandler
                                         QueryFilter.getFilter(command.predicate, cfs.getComparator()));
             RangeSliceReply reply = new RangeSliceReply(rows);
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
-            MessagingService.instance().sendReply(reply.createMessage(), id, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.from);
+            MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 2b33a03..ee39b82 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -4,6 +4,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -12,6 +13,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -48,12 +50,11 @@ class MigrationTask extends WrappedRunnable
                 return;
             }
 
-            IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint);
-
+            IAsyncResult<Collection<RowMutation>> iar = MessagingService.instance().sendRR(message, endpoint);
             try
             {
-                byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                DefsTable.mergeRemoteSchema(reply, Gossiper.instance.getVersion(endpoint));
+                Collection<RowMutation> schema = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                DefsTable.mergeSchema(schema);
                 return;
             }
             catch(TimeoutException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index d260f37..ba6b6bc 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -25,10 +25,7 @@ import com.google.common.collect.AbstractIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.Pair;
@@ -39,7 +36,7 @@ import org.apache.cassandra.utils.MergeIterator;
  * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
  * to the most recent ColumnFamily and setting up read repairs as necessary.
  */
-public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>>
+public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceReply, Iterable<Row>>
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
 
@@ -53,7 +50,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
 
     private final String table;
     private List<InetAddress> sources;
-    protected final Collection<MessageIn> responses = new LinkedBlockingQueue<MessageIn>();;
+    protected final Collection<MessageIn<RangeSliceReply>> responses = new LinkedBlockingQueue<MessageIn<RangeSliceReply>>();;
     public final List<IAsyncResult> repairResults = new ArrayList<IAsyncResult>();
 
     public RangeSliceResponseResolver(String table)
@@ -68,9 +65,8 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
 
     public List<Row> getData() throws IOException
     {
-        MessageIn response = responses.iterator().next();
-        RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
-        return reply.rows;
+        MessageIn<RangeSliceReply> response = responses.iterator().next();
+        return response.payload.rows;
     }
 
     // Note: this would deserialize the response a 2nd time if getData was called first.
@@ -79,11 +75,11 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
     {
         ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size());
         int n = 0;
-        for (MessageIn response : responses)
+        for (MessageIn<RangeSliceReply> response : responses)
         {
-            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
+            RangeSliceReply reply = response.payload;
             n = Math.max(n, reply.rows.size());
-            iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
+            iters.add(new RowIterator(reply.rows.iterator(), response.from));
         }
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
         // TODO do we need to call close?
@@ -125,7 +121,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
         public void close() {}
     }
 
-    public Iterable<MessageIn> getMessages()
+    public Iterable<MessageIn<RangeSliceReply>> getMessages()
     {
         return responses;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index a1d60bb..da0a35e 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
-public class RangeSliceVerbHandler implements IVerbHandler
+public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
 
@@ -49,7 +49,7 @@ public class RangeSliceVerbHandler implements IVerbHandler
             return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, columnFilter, command.row_filter, command.maxIsColumns, command.isPaging);
     }
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<RangeSliceCommand> message, String id)
     {
         try
         {
@@ -58,11 +58,10 @@ public class RangeSliceVerbHandler implements IVerbHandler
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            RangeSliceCommand command = RangeSliceCommand.read(message);
-            RangeSliceReply reply = new RangeSliceReply(executeLocally(command));
+            RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
-            MessagingService.instance().sendReply(reply.createMessage(), id, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.from);
+            MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 66d973b..6608b3e 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -49,14 +49,14 @@ import org.apache.cassandra.utils.WrappedRunnable;
 
 import com.google.common.collect.Lists;
 
-public class ReadCallback<T> implements IAsyncCallback
+public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
     protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
     protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 
-    public final IResponseResolver<T> resolver;
+    public final IResponseResolver<TMessage, TResolved> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
     protected final int blockfor;
@@ -67,7 +67,7 @@ public class ReadCallback<T> implements IAsyncCallback
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
         this.command = command;
         this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
@@ -126,7 +126,7 @@ public class ReadCallback<T> implements IAsyncCallback
         return ep.subList(0, Math.min(ep.size(), blockfor));
     }
 
-    public T get() throws TimeoutException, DigestMismatchException, IOException
+    public TResolved get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
         boolean success;
@@ -143,14 +143,14 @@ public class ReadCallback<T> implements IAsyncCallback
         {
             StringBuilder sb = new StringBuilder("");
             for (MessageIn message : resolver.getMessages())
-                sb.append(message.getFrom()).append(", ");
+                sb.append(message.from).append(", ");
             throw new TimeoutException("Operation timed out - received only " + received.get() + " responses from " + sb.toString() + " .");
         }
 
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
     }
 
-    public void response(MessageIn message)
+    public void response(MessageIn<TMessage> message)
     {
         resolver.preprocess(message);
         int n = waitingFor(message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index 3f42d85..056f2fb 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -38,9 +38,9 @@ public class RowDigestResolver extends AbstractRowResolver
      */
     public Row getData() throws IOException
     {
-        for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
+        for (MessageIn<ReadResponse> message : replies)
         {
-            ReadResponse result = entry.getValue();
+            ReadResponse result = message.payload;
             if (!result.isDigestQuery())
                 return result.row();
         }
@@ -69,9 +69,10 @@ public class RowDigestResolver extends AbstractRowResolver
         // also extract the data reply, if any.
         ColumnFamily data = null;
         ByteBuffer digest = null;
-        for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
+
+        for (MessageIn<ReadResponse> message : replies)
         {
-            ReadResponse response = entry.getValue();
+            ReadResponse response = message.payload;
             if (response.isDigestQuery())
             {
                 if (digest == null)
@@ -112,9 +113,9 @@ public class RowDigestResolver extends AbstractRowResolver
 
     public boolean isDataPresent()
     {
-        for (ReadResponse result : replies.values())
+        for (MessageIn<ReadResponse> message : replies)
         {
-            if (!result.isDigestQuery())
+            if (!message.payload.isDigestQuery())
                 return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 4796836..cb5ba32 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -67,14 +67,13 @@ public class RowRepairResolver extends AbstractRowResolver
             List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
             List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
 
-            for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
+            for (MessageIn<ReadResponse> message : replies)
             {
-                MessageIn message = entry.getKey();
-                ReadResponse response = entry.getValue();
+                ReadResponse response = message.payload;
                 ColumnFamily cf = response.row().cf;
-                assert !response.isDigestQuery() : "Received digest response to repair read from " + entry.getKey().getFrom();
+                assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
                 versions.add(cf);
-                endpoints.add(message.getFrom());
+                endpoints.add(message.from);
 
                 // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
                 int liveColumns = cf == null ? 0 : cf.getLiveColumnCount();
@@ -93,7 +92,7 @@ public class RowRepairResolver extends AbstractRowResolver
         }
         else
         {
-            resolved = replies.values().iterator().next().row().cf;
+            resolved = replies.iterator().next().payload.row().cf;
         }
 
         if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index ff678c2..62316c8 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -27,21 +27,21 @@ import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SnapshotVerbHandler implements IVerbHandler
+public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<SnapshotCommand> message, String id)
     {
         try
         {
-            SnapshotCommand command = SnapshotCommand.read(message);
+            SnapshotCommand command = message.payload;
             if (command.clear_snapshot)
                 Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
             else
                 Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
             if (logger.isDebugEnabled())
-                logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.getFrom());
-            MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.getFrom());
+                logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
+            MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 05772d3..9fad95a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
         do
         {
             List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
-            ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];
+            ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
 
             if (!commandsToRetry.isEmpty())
                 logger.debug("Retrying {} commands", commandsToRetry.size());
@@ -654,7 +654,7 @@ public class StorageProxy implements StorageProxyMBean
                 DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
 
                 RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
-                ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+                ReadCallback<ReadResponse, Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
                 handler.assureSufficientLiveNodes();
                 assert !handler.endpoints.isEmpty();
                 readCallbacks[i] = handler;
@@ -703,7 +703,7 @@ public class StorageProxy implements StorageProxyMBean
             List<RepairCallback> repairResponseHandlers = null;
             for (int i = 0; i < commands.size(); i++)
             {
-                ReadCallback<Row> handler = readCallbacks[i];
+                ReadCallback<ReadResponse, Row> handler = readCallbacks[i];
                 ReadCommand command = commands.get(i);
                 try
                 {
@@ -796,10 +796,10 @@ public class StorageProxy implements StorageProxyMBean
     static class LocalReadRunnable extends DroppableRunnable
     {
         private final ReadCommand command;
-        private final ReadCallback<Row> handler;
+        private final ReadCallback<ReadResponse, Row> handler;
         private final long start = System.currentTimeMillis();
 
-        LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
+        LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler)
         {
             super(MessagingService.Verb.READ);
             this.command = command;
@@ -819,7 +819,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
+    static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
     {
         if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
         {
@@ -880,7 +880,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
-                    ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
+                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
                     handler.assureSufficientLiveNodes();
                     resolver.setSources(handler.endpoints);
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
@@ -947,14 +947,13 @@ public class StorageProxy implements StorageProxyMBean
         final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
 
-        IAsyncCallback cb = new IAsyncCallback()
+        IAsyncCallback<UUID> cb = new IAsyncCallback<UUID>()
         {
-            public void response(MessageIn message)
+            public void response(MessageIn<UUID> message)
             {
                 // record the response from the remote node.
-                logger.debug("Received schema check response from {}", message.getFrom().getHostAddress());
-                UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
-                versions.put(message.getFrom(), theirVersion);
+                logger.debug("Received schema check response from {}", message.from.getHostAddress());
+                versions.put(message.from, message.payload);
                 latch.countDown();
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index e4f3796..5cb7c7e 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -17,17 +17,23 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.compress.lzf.LZFOutputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.Header;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -35,11 +41,6 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import com.ning.compress.lzf.LZFOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class FileStreamTask extends WrappedRunnable
 {
     private static final Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
@@ -196,13 +197,8 @@ public class FileStreamTask extends WrappedRunnable
 
         input.readInt(); // Read total size
         String id = input.readUTF();
-        Header header = Header.serializer().deserialize(input, version);
-
-        int bodySize = input.readInt();
-        byte[] body = new byte[bodySize];
-        input.readFully(body);
-        MessageIn message = new MessageIn(header, body, version);
-        assert message.getVerb() == MessagingService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
+        MessageIn message = MessageIn.read(input, version, id);
+        assert message.verb == MessagingService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
         handler.doVerb(message, id);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
index 88d621b..ff68eaf 100644
--- a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
@@ -32,10 +32,10 @@ public class ReplicationFinishedVerbHandler implements IVerbHandler
 
     public void doVerb(MessageIn msg, String id)
     {
-        StorageService.instance.confirmReplication(msg.getFrom());
+        StorageService.instance.confirmReplication(msg.from);
         MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
         if (logger.isDebugEnabled())
-            logger.debug("Replying to " + id + "@" + msg.getFrom());
-        MessagingService.instance().sendReply(response, id, msg.getFrom());
+            logger.debug("Replying to " + id + "@" + msg.from);
+        MessagingService.instance().sendReply(response, id, msg.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index a44c215..d207d6e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-class StreamReply
+public class StreamReply
 {
     static enum Status
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 1155518..ba9b04d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -29,36 +29,33 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 
-public class StreamReplyVerbHandler implements IVerbHandler
+public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<StreamReply> message, String id)
     {
-        byte[] body = message.getMessageBody();
-        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
-
         try
         {
-            StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn), message.getVersion());
+            StreamReply reply = message.payload;
             logger.debug("Received StreamReply {}", reply);
-            StreamOutSession session = StreamOutSession.get(message.getFrom(), reply.sessionId);
+            StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
             if (session == null)
             {
-                logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.getFrom());
+                logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
                 return;
             }
 
             switch (reply.action)
             {
                 case FILE_FINISHED:
-                    logger.info("Successfully sent {} to {}", reply.file, message.getFrom());
+                    logger.info("Successfully sent {} to {}", reply.file, message.from);
                     session.validateCurrentFile(reply.file);
                     session.startNext();
                     break;
                 case FILE_RETRY:
                     session.validateCurrentFile(reply.file);
-                    logger.info("Need to re-stream file {} to {}", reply.file, message.getFrom());
+                    logger.info("Need to re-stream file {} to {}", reply.file, message.from);
                     session.retry();
                     break;
                 case SESSION_FINISHED:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java b/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
index 561cd1f..3f54f05 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.net.MessagingService;
 *
 * If a file is specified, ranges and table will not. vice-versa should hold as well.
 */
-class StreamRequestMessage // TODO rename to StreamRequest
+public class StreamRequestMessage // TODO rename to StreamRequest
 {
     private static final IVersionedSerializer<StreamRequestMessage> serializer;
     static
@@ -52,7 +52,7 @@ class StreamRequestMessage // TODO rename to StreamRequest
         serializer = new StreamRequestMessageSerializer();
     }
 
-    protected static IVersionedSerializer<StreamRequestMessage> serializer()
+    public static IVersionedSerializer<StreamRequestMessage> serializer()
     {
         return serializer;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
index 6e20f19..e5eed0b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
@@ -32,30 +32,21 @@ import org.apache.cassandra.net.MessageIn;
  * This verb handler handles the StreamRequestMessage that is sent by
  * the node requesting range transfer.
 */
-public class StreamRequestVerbHandler implements IVerbHandler
+public class StreamRequestVerbHandler implements IVerbHandler<StreamRequestMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn<StreamRequestMessage> message, String id)
     {
         if (logger.isDebugEnabled())
-            logger.debug("Received a StreamRequestMessage from {}", message.getFrom());
-
-        byte[] body = message.getMessageBody();
-        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
-        try
-        {
-            StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
-            if (logger.isDebugEnabled())
-                logger.debug(srm.toString());
-
-            StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(), srm.sessionId);
-            StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
+            logger.debug("Received a StreamRequestMessage from {}", message.from);
+
+        StreamRequestMessage srm = message.payload;
+        if (logger.isDebugEnabled())
+            logger.debug(srm.toString());
+
+        StreamOutSession session = StreamOutSession.create(srm.table, message.from, srm.sessionId);
+        StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 03d0b6e..3074bc0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -50,7 +50,7 @@ public class StreamingRepairTask implements Runnable
 
     // maps of tasks created on this node
     private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>();
-    private static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
+    public static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
 
     public final UUID id;
     private final InetAddress owner; // the node where the task is created; can be == src but don't need to
@@ -186,54 +186,30 @@ public class StreamingRepairTask implements Runnable
         };
     }
 
-    public static class StreamingRepairRequest implements IVerbHandler
+    public static class StreamingRepairRequest implements IVerbHandler<StreamingRepairTask>
     {
-        public void doVerb(MessageIn message, String id)
+        public void doVerb(MessageIn<StreamingRepairTask> message, String id)
         {
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
-
-            StreamingRepairTask task;
-            try
-            {
-                task = StreamingRepairTask.serializer.deserialize(dis, message.getVersion());
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-
+            StreamingRepairTask task = message.payload;
             assert task.src.equals(FBUtilities.getBroadcastAddress());
-            assert task.owner.equals(message.getFrom());
+            assert task.owner.equals(message.from);
 
-            logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), task.dst));
+            logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.from, task.ranges.size(), task.dst));
 
             task.run();
         }
 
     }
 
-    public static class StreamingRepairResponse implements IVerbHandler
+    public static class StreamingRepairResponse implements IVerbHandler<UUID>
     {
-        public void doVerb(MessageIn message, String id)
+        public void doVerb(MessageIn<UUID> message, String id)
         {
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
-
-            UUID taskid;
-            try
-            {
-                taskid = UUIDGen.serializer.deserialize(dis, message.getVersion());
-            }
-            catch (IOException e)
-            {
-                throw new IOError(new IOException("Error reading stream repair response from " + message.getFrom(), e));
-            }
-
+            UUID taskid = message.payload;
             StreamingRepairTask task = tasks.get(taskid);
             if (task == null)
             {
-                logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.getFrom(), taskid));
+                logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.from, taskid));
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index 0ea99ef..b03bc25 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -29,9 +29,10 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.CallbackInfo;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessageSerializer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
@@ -50,8 +51,6 @@ import java.util.HashMap;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
-    private static MessageSerializer messageSerializer = new MessageSerializer();
-
     private void testRangeSliceCommandWrite() throws IOException
     {
         ByteBuffer startCol = ByteBufferUtil.bytes("Start");
@@ -93,10 +92,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("db.RangeSliceCommand.bin");
         for (int i = 0; i < 6; i++)
-        {
-            MessageIn msg = messageSerializer.deserialize(in, getVersion());
-            RangeSliceCommand cmd = RangeSliceCommand.read(msg);
-        }
+            MessageIn.read(in, getVersion(), "id");
         in.close();
     }
 
@@ -126,8 +122,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert SliceByNamesReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 
@@ -156,8 +152,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert SliceFromReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer().deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 
@@ -228,12 +224,12 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
         assert RowMutation.serializer().deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 
@@ -264,9 +260,14 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert Truncation.serializer().deserialize(in, getVersion()) != null;
         assert TruncateResponse.serializer().deserialize(in, getVersion()) != null;
         assert TruncateResponse.serializer().deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+
+        // set up some fake callbacks so deserialization knows that what it's deserializing is a TruncateResponse
+        MessagingService.instance().setCallbackForTests("tr1", new CallbackInfo(null, null, TruncateResponse.serializer()));
+        MessagingService.instance().setCallbackForTests("tr2", new CallbackInfo(null, null, TruncateResponse.serializer()));
+
+        assert MessageIn.read(in, getVersion(), "tr1") != null;
+        assert MessageIn.read(in, getVersion(), "tr2") != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/net/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessageSerializer.java b/test/unit/org/apache/cassandra/net/MessageSerializer.java
deleted file mode 100644
index ec0ab1d..0000000
--- a/test/unit/org/apache/cassandra/net/MessageSerializer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.cassandra.net;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.*;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-
-public class MessageSerializer implements IVersionedSerializer<MessageIn>
-{
-    // TODO imitate backwards-compatibility code from OutboundTcpConnection here
-    public void serialize(MessageIn t, DataOutput dos, int version) throws IOException
-    {
-        Header.serializer().serialize(t.header, dos, version);
-        byte[] bytes = t.getMessageBody();
-        dos.writeInt(bytes.length);
-        dos.write(bytes);
-    }
-
-    public MessageIn deserialize(DataInput dis, int version) throws IOException
-    {
-        Header header = Header.serializer().deserialize(dis, version);
-        int size = dis.readInt();
-        byte[] bytes = new byte[size];
-        dis.readFully(bytes);
-        return new MessageIn(header, bytes, version);
-    }
-
-    public long serializedSize(MessageIn message, int version)
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 418db98..0ae9c38 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -171,7 +171,7 @@ public class RemoveTest
     {
         public MessageIn handleMessage(MessageIn msg, String id, InetAddress to)
         {
-            if (!msg.getVerb().equals(MessagingService.Verb.STREAM_REQUEST))
+            if (!msg.verb.equals(MessagingService.Verb.STREAM_REQUEST))
                 return msg;
 
             StreamUtil.finishStreamRequest(msg, to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 6ab8685..43752bf 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -24,8 +24,6 @@ package org.apache.cassandra.service;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.junit.Test;
 
@@ -34,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.MessageSerializer;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
@@ -45,8 +43,6 @@ public class SerializationsTest extends AbstractSerializationsTester
         System.setProperty("cassandra.partitioner", "RandomPartitioner");
     }
 
-    private static MessageSerializer messageSerializer = new MessageSerializer();
-
     public static Range<Token> FULL_RANGE = new Range<Token>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
 
     private void testTreeRequestWrite() throws IOException
@@ -65,7 +61,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("service.TreeRequest.bin");
         assert AntiEntropyService.TreeRequest.serializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 
@@ -98,8 +94,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         DataInputStream in = getInput("service.TreeResponse.bin");
         assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
         assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index f2ed9eb..ebc181a 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -43,15 +43,13 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessageSerializer;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
-    private static MessageSerializer messageSerializer = new MessageSerializer();
-
     private void testPendingFileWrite() throws IOException
     {
         // make sure to test serializing null and a pf with no sstable.
@@ -133,7 +131,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("streaming.StreamReply.bin");
         assert StreamReply.serializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 
@@ -176,9 +174,9 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
         assert StreamRequestMessage.serializer().deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
-        assert messageSerializer.deserialize(in, getVersion()) != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), "id") != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/test/unit/org/apache/cassandra/streaming/StreamUtil.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamUtil.java b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
index c0bed04..0719eb4 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamUtil.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.streaming;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.MessageIn;
@@ -28,26 +30,20 @@ import org.apache.cassandra.net.MessagingService;
 
 public class StreamUtil
 {
-
     /**
      * Takes an stream request message and creates an empty status response. Exists here because StreamRequestMessage
      * is package protected.
      */
-    static public void finishStreamRequest(MessageIn msg, InetAddress to)
+    static public void finishStreamRequest(MessageIn<StreamRequestMessage> msg, InetAddress to)
     {
-        byte[] body = msg.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-
+        StreamInSession session = StreamInSession.get(to, msg.payload.sessionId);
         try
         {
-            StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn), MessagingService.current_version);
-            StreamInSession session = StreamInSession.get(to, srm.sessionId);
             session.closeIfFinished();
         }
-        catch (Exception e)
+        catch (IOException e)
         {
-            System.err.println(e);
-            e.printStackTrace();
+            throw new IOError(e);
         }
     }
 }


Mime
View raw message