cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [14/15] Introduce MessageOut class, which wraps an object to be sent in the "payload" field. The old Header class is inlined into the "parameters" map. patch by jbellis; reviewed by yukim for CASSANDRA-3617
Date Tue, 08 May 2012 17:56:15 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4f34358..2aa75ef 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,11 +25,11 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -37,10 +37,7 @@ public class OutboundTcpConnection extends Thread
 {
     private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
 
-    private static final Message CLOSE_SENTINEL = new Message(FBUtilities.getBroadcastAddress(),
-                                                              StorageService.Verb.INTERNAL_RESPONSE,
-                                                              ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                              MessagingService.current_version);
+    private static final MessageOut CLOSE_SENTINEL = new MessageOut(StorageService.Verb.INTERNAL_RESPONSE);
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
 
@@ -62,7 +59,7 @@ public class OutboundTcpConnection extends Thread
         this.poolReference = pool;
     }
 
-    public void enqueue(Message message, String id)
+    public void enqueue(MessageOut<?> message, String id)
     {
         expireMessages();
         try
@@ -109,7 +106,7 @@ public class OutboundTcpConnection extends Thread
                 active = tmp;
             }
 
-            Message m = entry.message;
+            MessageOut<?> m = entry.message;
             String id = entry.id;
             if (m == CLOSE_SENTINEL)
             {
@@ -141,7 +138,7 @@ public class OutboundTcpConnection extends Thread
         return dropped.get();
     }
 
-    private void writeConnected(Message message, String id)
+    private void writeConnected(MessageOut<?> message, String id)
     {
         try
         {
@@ -163,7 +160,12 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    public static void write(Message message, String id, DataOutputStream out) throws IOException
+    public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException
+    {
+        write(message, id, out, Gossiper.instance.getVersion(poolReference.endPoint()));
+    }
+
+    public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException
     {
         /*
          Setting up the protocol header. This is 4 bytes long
@@ -182,18 +184,17 @@ public class OutboundTcpConnection extends Thread
         if (false)
             header |= 4;
         // Setting up the version bit
-        header |= (message.getVersion() << 8);
+        header |= (version << 8);
 
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
         out.writeInt(header);
-        // compute total Message length for compatibility w/ 0.8 and earlier
-        byte[] bytes = message.getMessageBody();
-        int total = messageLength(message.header, id, bytes);
-        out.writeInt(total);
+
+        // 0.8 included a total message size int.  1.0 doesn't need it but expects it to be there.
+        if (version <= MessagingService.VERSION_11)
+            out.writeInt(-1);
+
         out.writeUTF(id);
-        Header.serializer().serialize(message.header, out, message.getVersion());
-        out.writeInt(bytes.length);
-        out.write(bytes);
+        message.serialize(out, version);
     }
 
     public static int messageLength(Header header, String id, byte[] bytes)
@@ -277,11 +278,11 @@ public class OutboundTcpConnection extends Thread
 
     private static class Entry
     {
-        final Message message;
+        final MessageOut<?> message;
         final String id;
         final long timestamp;
 
-        Entry(Message message, String id, long timestamp)
+        Entry(MessageOut<?> message, String id, long timestamp)
         {
             this.message = message;
             this.id = id;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 3a8ec12..d86022b 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -51,9 +51,9 @@ public class OutboundTcpConnectionPool
      * returns the appropriate connection based on message type.
      * returns null if a connection could not be established.
      */
-    OutboundTcpConnection getConnection(Message msg)
+    OutboundTcpConnection getConnection(MessageOut msg)
     {
-        Stage stage = msg.getMessageType();
+        Stage stage = msg.getStage();
         return stage == Stage.REQUEST_RESPONSE || stage == Stage.INTERNAL_RESPONSE || stage == Stage.GOSSIP
                ? ackCon
                : cmdCon;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/net/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
index 994f121..8d0ea2e 100644
--- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
+++ b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.net.sink;
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 
 public interface IMessageSink
 {
+    public MessageOut handleMessage(MessageOut message, String id, InetAddress to);
+
     public Message handleMessage(Message message, String id, InetAddress to);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/net/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java
index 2cf67a6..e0d0cd5 100644
--- a/src/java/org/apache/cassandra/net/sink/SinkManager.java
+++ b/src/java/org/apache/cassandra/net/sink/SinkManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 
 public class SinkManager
 {
@@ -37,7 +38,7 @@ public class SinkManager
         sinks.clear();
     }
 
-    public static Message processClientMessage(Message message, String id, InetAddress to)
+    public static MessageOut processOutboundMessage(MessageOut message, String id, InetAddress to)
     {
         if (sinks.isEmpty())
             return message;
@@ -51,7 +52,7 @@ public class SinkManager
         return message;
     }
 
-    public static Message processServerMessage(Message message, String id)
+    public static Message processInboundMessage(Message message, String id)
     {
         if (sinks.isEmpty())
             return message;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index c923bde..e339363 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -26,31 +26,29 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.streaming.StreamingRepairTask;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -223,6 +221,19 @@ public class AntiEntropyService
     }
 
     /**
+<<<<<<< HEAD
+=======
+     * Requests a tree from the given node, and returns the request that was sent.
+     */
+    TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname)
+    {
+        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
+        MessagingService.instance().sendOneWay(request.createMessage(), remote);
+        return request;
+    }
+
+    /**
+>>>>>>> ddbe4e6... MessageOut
      * Responds to the node that requested the given valid tree.
      * @param validator A locally generated validator
      * @param local localhost (parameterized for testing)
@@ -233,10 +244,9 @@ public class AntiEntropyService
 
         try
         {
-            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
             if (!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
                 logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s", validator.request.sessionid, validator.request.endpoint, validator.request.cf));
-            ms.sendOneWay(message, validator.request.endpoint);
+            ms.sendOneWay(validator.createMessage(), validator.request.endpoint);
         }
         catch (Exception e)
         {
@@ -264,6 +274,7 @@ public class AntiEntropyService
         private transient DecoratedKey lastKey;
 
         public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
+        public static ValidatorSerializer serializer = new ValidatorSerializer();
 
         public Validator(TreeRequest request)
         {
@@ -404,54 +415,47 @@ public class AntiEntropyService
             // respond to the request that triggered this validation
             AntiEntropyService.instance.respond(this, FBUtilities.getBroadcastAddress());
         }
-    }
 
-    /**
-     * Handler for requests from remote nodes to generate a valid tree.
-     * The payload is a CFPair representing the columnfamily to validate.
-     */
-    public static class TreeRequestVerbHandler implements IVerbHandler
-    {
-        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
-        static Message makeVerb(TreeRequest request, int version)
+        public MessageOut<Validator> createMessage()
         {
-            try
-            {
-                FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-                DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(request, dos, version);
-                return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
-            }
-            catch(IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            return new MessageOut<Validator>(StorageService.Verb.TREE_RESPONSE, this, Validator.serializer);
         }
 
-        public void serialize(TreeRequest request, DataOutput dos, int version) throws IOException
+        public static class ValidatorSerializer implements IVersionedSerializer<Validator>
         {
-            dos.writeUTF(request.sessionid);
-            CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
-            dos.writeUTF(request.cf.left);
-            dos.writeUTF(request.cf.right);
-            if (version > MessagingService.VERSION_07)
-                AbstractBounds.serializer().serialize(request.range, dos, version);
-        }
+            public void serialize(Validator validator, DataOutput dos, int version) throws IOException
+            {
+                TreeRequest.serializer.serialize(validator.request, dos, version);
+                MerkleTree.serializer.serialize(validator.tree, dos, version);
+            }
 
-        public TreeRequest deserialize(DataInput dis, int version) throws IOException
-        {
-            String sessId = dis.readUTF();
-            InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
-            CFPair cfpair = new CFPair(dis.readUTF(), dis.readUTF());
-            Range<Token> range;
-            if (version > MessagingService.VERSION_07)
-                range = (Range<Token>) AbstractBounds.serializer().deserialize(dis, version);
-            else
-                range = new Range<Token>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
+            public Validator deserialize(DataInput dis, int version) throws IOException
+            {
+                final TreeRequest request = TreeRequest.serializer.deserialize(dis, version);
+                try
+                {
+                    return new Validator(request, MerkleTree.serializer.deserialize(dis, version));
+                }
+                catch(Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
 
-            return new TreeRequest(sessId, endpoint, range, cfpair);
+            public long serializedSize(Validator validator, int version)
+            {
+                return TreeRequest.serializer.serializedSize(validator.request, version)
+                       + MerkleTree.serializer.serializedSize(validator.tree, version);
+            }
         }
+    }
 
+    /**
+     * Handler for requests from remote nodes to generate a valid tree.
+     * The payload is a CFPair representing the columnfamily to validate.
+     */
+    public static class TreeRequestVerbHandler implements IVerbHandler
+    {
         /**
          * Trigger a validation compaction which will return the tree upon completion.
          */
@@ -462,7 +466,7 @@ public class AntiEntropyService
             DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
             try
             {
-                TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
+                TreeRequest remotereq = TreeRequest.serializer.deserialize(buffer, message.getVersion());
                 TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.range, remotereq.cf);
 
                 // trigger readonly-compaction
@@ -484,45 +488,6 @@ public class AntiEntropyService
      */
     public static class TreeResponseVerbHandler implements IVerbHandler
     {
-        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();
-        static Message makeVerb(InetAddress local, Validator validator)
-        {
-            try
-            {
-                FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-                DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint));
-                return new Message(local,
-                                   StorageService.Verb.TREE_RESPONSE,
-                                   bos.toByteArray(),
-                                   Gossiper.instance.getVersion(validator.request.endpoint));
-            }
-            catch(IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public void serialize(Validator v, DataOutputStream dos, int version) throws IOException
-        {
-            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos, version);
-            MerkleTree.serializer.serialize(v.tree, dos, version);
-            dos.flush();
-        }
-
-        public Validator deserialize(DataInputStream dis, int version) throws IOException
-        {
-            final TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis, version);
-            try
-            {
-                return new Validator(request, MerkleTree.serializer.deserialize(dis, version));
-            }
-            catch(Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
         public void doVerb(Message message, String id)
         {
             byte[] bytes = message.getMessageBody();
@@ -531,7 +496,7 @@ public class AntiEntropyService
             try
             {
                 // deserialize the remote tree, and register it
-                Validator response = this.deserialize(buffer, message.getVersion());
+                Validator response = Validator.serializer.deserialize(buffer, message.getVersion());
                 TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.range, response.request.cf);
                 AntiEntropyService.instance.rendezvous(request, response.tree);
             }
@@ -559,6 +524,8 @@ public class AntiEntropyService
      */
     public static class TreeRequest
     {
+        public static final TreeRequestSerializer serializer = new TreeRequestSerializer();
+
         public final String sessionid;
         public final InetAddress endpoint;
         public final Range<Token> range;
@@ -593,6 +560,43 @@ public class AntiEntropyService
         {
             return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf + ", " + range + ">";
         }
+
+        public MessageOut<TreeRequest> createMessage()
+        {
+            return new MessageOut<TreeRequest>(StorageService.Verb.TREE_REQUEST, this, TreeRequest.serializer);
+        }
+
+        public static class TreeRequestSerializer implements IVersionedSerializer<TreeRequest>
+        {
+            public void serialize(TreeRequest request, DataOutput dos, int version) throws IOException
+            {
+                dos.writeUTF(request.sessionid);
+                CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
+                dos.writeUTF(request.cf.left);
+                dos.writeUTF(request.cf.right);
+                AbstractBounds.serializer().serialize(request.range, dos, version);
+            }
+
+            public TreeRequest deserialize(DataInput dis, int version) throws IOException
+            {
+                String sessId = dis.readUTF();
+                InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
+                CFPair cfpair = new CFPair(dis.readUTF(), dis.readUTF());
+                Range<Token> range;
+                range = (Range<Token>) AbstractBounds.serializer().deserialize(dis, version);
+
+                return new TreeRequest(sessId, endpoint, range, cfpair);
+            }
+
+            public long serializedSize(TreeRequest request, int version)
+            {
+                return 2 + FBUtilities.encodedUTF8Length(request.sessionid)
+                     + CompactEndpointSerializationHelper.serializedSize(request.endpoint)
+                     + 2 + FBUtilities.encodedUTF8Length(request.cf.left)
+                     + 2 + FBUtilities.encodedUTF8Length(request.cf.right)
+                     + AbstractBounds.serializer().serializedSize(request.range, version);
+            }
+        }
     }
 
     /**
@@ -840,7 +844,7 @@ public class AntiEntropyService
                 {
                     public void send(TreeRequest r)
                     {
-                        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(r, Gossiper.instance.getVersion(r.endpoint)), r.endpoint);
+                        MessagingService.instance().sendOneWay(r.createMessage(), r.endpoint);
                     }
                 };
                 this.differencers = new RequestCoordinator<Differencer>(isSequential)
@@ -879,20 +883,18 @@ public class AntiEntropyService
                     snapshotLatch = new CountDownLatch(endpoints.size());
                     IAsyncCallback callback = new IAsyncCallback()
                     {
-                        @Override
                             public boolean isLatencyForSnitch()
                             {
                                 return false;
                             }
 
-                        @Override
                             public void response(Message msg)
                             {
                                 RepairJob.this.snapshotLatch.countDown();
                             }
                     };
                     for (InetAddress endpoint : endpoints)
-                        MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false), endpoint, callback);
+                        MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false).createMessage(), endpoint, callback);
                     snapshotLatch.await();
                     snapshotLatch = null;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 3b0c90b..27970aa 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -44,10 +44,9 @@ public class IndexScanVerbHandler implements IVerbHandler
                                         command.index_clause.count,
                                         QueryFilter.getFilter(command.predicate, cfs.getComparator()));
             RangeSliceReply reply = new RangeSliceReply(rows);
-            Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
-            MessagingService.instance().sendReply(response, id, message.getFrom());
+            MessagingService.instance().sendReply(reply.createMessage(), id, message.getFrom());
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 0d7bf4f..68d0009 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -17,10 +17,7 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -30,8 +27,6 @@ import java.util.concurrent.Future;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,30 +35,31 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.commons.lang.ArrayUtils;
 
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
 
     // try that many times to send migration request to the node before giving up
-    private static final int MIGRATION_REQUEST_RETRIES = 3;
+    static final int MIGRATION_REQUEST_RETRIES = 3;
     private static final ByteBuffer LAST_MIGRATION_KEY = ByteBufferUtil.bytes("Last Migration");
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
@@ -192,15 +188,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema)
     {
-        try
-        {
-            Message msg = makeMigrationMessage(schema, Gossiper.instance.getVersion(endpoint));
-            MessagingService.instance().sendOneWay(msg, endpoint);
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
+        MessageOut<Collection<RowMutation>> msg = new MessageOut<Collection<RowMutation>>(StorageService.Verb.DEFINITIONS_UPDATE,
+                                                                                          schema,
+                                                                                          MigrationsSerializer.instance);
+        MessagingService.instance().sendOneWay(msg, endpoint);
     }
 
     // Returns a future on the local application of the schema
@@ -243,69 +234,6 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     }
 
     /**
-     * Serialize given row mutations into raw bytes and make a migration message
-     * (other half of transformation is in DefinitionsUpdateResponseVerbHandler.)
-     *
-     * @param schema The row mutations to send to remote nodes
-     * @param version The version to use for message
-     *
-     * @return Serialized migration containing schema mutations
-     *
-     * @throws IOException on failed serialization
-     */
-    private static Message makeMigrationMessage(Collection<RowMutation> schema, int version) throws IOException
-    {
-        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.DEFINITIONS_UPDATE, serializeSchema(schema, version), version);
-    }
-
-    /**
-     * Serialize given row mutations into raw bytes
-     *
-     * @param schema The row mutations to serialize
-     * @param version The version of the message service to use for serialization
-     *
-     * @return serialized mutations
-     *
-     * @throws IOException on failed serialization
-     */
-    public static byte[] serializeSchema(Collection<RowMutation> schema, int version) throws IOException
-    {
-        FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
-        DataOutputStream dout = new DataOutputStream(bout);
-        dout.writeInt(schema.size());
-
-        for (RowMutation mutation : schema)
-            RowMutation.serializer().serialize(mutation, dout, version);
-
-        dout.close();
-
-        return bout.toByteArray();
-    }
-
-    /**
-     * Deserialize migration message considering data compatibility starting from version 1.1
-     *
-     * @param data The data of the message from coordinator which hold schema mutations to apply
-     * @param version The version of the message
-     *
-     * @return The collection of the row mutations to apply on the node (aka schema)
-     *
-     * @throws IOException if message is of incompatible version or data is corrupted
-     */
-    public static Collection<RowMutation> deserializeMigrationMessage(byte[] data, int version) throws IOException
-    {
-        Collection<RowMutation> schema = new ArrayList<RowMutation>();
-        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data));
-
-        int count = in.readInt();
-
-        for (int i = 0; i < count; i++)
-            schema.add(RowMutation.serializer().deserialize(in, version));
-
-        return schema;
-    }
-
-    /**
      * Clear all locally stored schema information and reset schema to initial state.
      * Called by user (via JMX) who wants to get rid of schema disagreement.
      *
@@ -381,45 +309,35 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
             return UUIDGen.getUUID(cf.getColumn(LAST_MIGRATION_KEY).value());
     }
 
-    static class MigrationTask extends WrappedRunnable
+    public static class MigrationsSerializer implements IVersionedSerializer<Collection<RowMutation>>
     {
-        private final InetAddress endpoint;
+        public static MigrationsSerializer instance = new MigrationsSerializer();
 
-        MigrationTask(InetAddress endpoint)
+        public void serialize(Collection<RowMutation> schema, DataOutput out, int version) throws IOException
         {
-            this.endpoint = endpoint;
+            out.writeInt(schema.size());
+            for (RowMutation rm : schema)
+                RowMutation.serializer().serialize(rm, out, version);
         }
 
-        public void runMayThrow() throws Exception
+        public Collection<RowMutation> deserialize(DataInput in, int version) throws IOException
         {
-            Message message = new Message(FBUtilities.getBroadcastAddress(),
-                                          StorageService.Verb.MIGRATION_REQUEST,
-                                          ArrayUtils.EMPTY_BYTE_ARRAY,
-                                          Gossiper.instance.getVersion(endpoint));
+            int count = in.readInt();
+            Collection<RowMutation> schema = new ArrayList<RowMutation>(count);
 
-            int retries = 0;
-            while (retries < MIGRATION_REQUEST_RETRIES)
-            {
-                if (!FailureDetector.instance.isAlive(endpoint))
-                {
-                    logger.error("Can't send migration request: node {} is down.", endpoint);
-                    return;
-                }
-
-                IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint);
+            for (int i = 0; i < count; i++)
+                schema.add(RowMutation.serializer().deserialize(in, version));
 
-                try
-                {
-                    byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            return schema;
+        }
 
-                    DefsTable.mergeRemoteSchema(reply, message.getVersion());
-                    return;
-                }
-                catch(TimeoutException e)
-                {
-                    retries++;
-                }
-            }
+        public long serializedSize(Collection<RowMutation> schema, int version)
+        {
+            int size = DBConstants.INT_SIZE;
+            for (RowMutation rm : schema)
+                size += RowMutation.serializer().serializedSize(rm, version);
+            return size;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
new file mode 100644
index 0000000..982bc8e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -0,0 +1,83 @@
+package org.apache.cassandra.service;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class MigrationTask extends WrappedRunnable
+{
+    private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
+
+    private static final MigrationTaskSerializer serializer = new MigrationTaskSerializer();
+
+    private final InetAddress endpoint;
+
+    MigrationTask(InetAddress endpoint)
+    {
+        this.endpoint = endpoint;
+    }
+
+    public void runMayThrow() throws Exception
+    {
+        MessageOut<MigrationTask> message = new MessageOut<MigrationTask>(StorageService.Verb.MIGRATION_REQUEST,
+                                                                          this,
+                                                                          serializer);
+
+        int retries = 0;
+        while (retries < MigrationManager.MIGRATION_REQUEST_RETRIES)
+        {
+            if (!FailureDetector.instance.isAlive(endpoint))
+            {
+                logger.error("Can't send migration request: node {} is down.", endpoint);
+                return;
+            }
+
+            IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint);
+
+            try
+            {
+                byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                DefsTable.mergeRemoteSchema(reply, Gossiper.instance.getVersion(endpoint));
+                return;
+            }
+            catch(TimeoutException e)
+            {
+                retries++;
+            }
+        }
+    }
+
+    private static class MigrationTaskSerializer implements IVersionedSerializer<MigrationTask>
+    {
+        public void serialize(MigrationTask task, DataOutput out, int version) throws IOException
+        {
+            // all recipient needs is our reply-to address, which it gets from the connection
+        }
+
+        public MigrationTask deserialize(DataInput in, int version) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long serializedSize(MigrationTask migrationTask, int version)
+        {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index 0de51c7..49d9ef1 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -60,10 +60,9 @@ public class RangeSliceVerbHandler implements IVerbHandler
             }
             RangeSliceCommand command = RangeSliceCommand.read(message);
             RangeSliceReply reply = new RangeSliceReply(executeLocally(command));
-            Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
-            MessagingService.instance().sendReply(response, id, message.getFrom());
+            MessagingService.instance().sendReply(reply.createMessage(), id, message.getFrom());
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 25f5ee6..8ac9981 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
@@ -256,8 +257,9 @@ public class ReadCallback<T> implements IAsyncCallback
                 final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
                 IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
+                MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
                 for (InetAddress endpoint : endpoints)
-                    MessagingService.instance().sendRR(readCommand, endpoint, repairHandler);
+                    MessagingService.instance().sendRR(message, endpoint, repairHandler);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 149f094..00fc4ad 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -32,9 +31,9 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
@@ -120,18 +119,10 @@ public class RowRepairResolver extends AbstractRowResolver
             // create and send the row mutation message based on the diff
             RowMutation rowMutation = new RowMutation(table, key.key);
             rowMutation.add(diffCf);
-            Message repairMessage;
-            try
-            {
-                // use a separate verb here because we don't want these to be get the white glove hint-
-                // on-timeout behavior that a "real" mutation gets
-                repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR,
-                                                       Gossiper.instance.getVersion(endpoints.get(i)));
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
+            MessageOut repairMessage;
+            // use a separate verb here because we don't want these to be get the white glove hint-
+            // on-timeout behavior that a "real" mutation gets
+            repairMessage = rowMutation.createMessage(StorageService.Verb.READ_REPAIR);
             results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index f128470..b00901f 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
@@ -38,10 +39,9 @@ public class SnapshotVerbHandler implements IVerbHandler
                 Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
             else
                 Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
-            Message response = message.getReply(FBUtilities.getBroadcastAddress(), new byte[0], MessagingService.current_version);
             if (logger.isDebugEnabled())
                 logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.getFrom());
-            MessagingService.instance().sendReply(response, id, message.getFrom());
+            MessagingService.instance().sendReply(new MessageOut(StorageService.Verb.REQUEST_RESPONSE), id, message.getFrom());
         }
         catch (Exception ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 73c2965..e393590 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -34,7 +34,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Multimap;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -291,8 +290,7 @@ public class StorageProxy implements StorageProxyMBean
     throws IOException, TimeoutException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-        Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());
-        MessageProducer producer = new CachingMessageProducer(rm);
+        Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>(targets.size());
 
         for (InetAddress destination : targets)
         {
@@ -320,14 +318,14 @@ public class StorageProxy implements StorageProxyMBean
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
 
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                    Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
                     {
                        messages = HashMultimap.create();
                        dcMessages.put(dc, messages);
                     }
 
-                    messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);
+                    messages.put(rm.createMessage(), destination);
                 }
             }
             else
@@ -347,7 +345,6 @@ public class StorageProxy implements StorageProxyMBean
                                                  final InetAddress target,
                                                  final IWriteResponseHandler responseHandler,
                                                  final ConsistencyLevel consistencyLevel)
-    throws IOException
     {
         // Hint of itself doesn't make sense.
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
@@ -390,20 +387,20 @@ public class StorageProxy implements StorageProxyMBean
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
-    private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages, IWriteResponseHandler handler)
+    private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, IWriteResponseHandler handler)
     throws IOException
     {
-        for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
+        for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
         {
             String dataCenter = entry.getKey();
 
             // send the messages corresponding to this datacenter
-            for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
+            for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
             {
-                Message message = messages.getKey();
+                MessageOut message = messages.getKey();
                 // a single message object is used for unhinted writes, so clean out any forwards
                 // from previous loop iterations
-                message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+                message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
                 Iterator<InetAddress> iter = messages.getValue().iterator();
                 InetAddress target = iter.next();
 
@@ -434,7 +431,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (logger.isDebugEnabled())
                         logger.debug("Adding FWD message to: " + destination + " with ID " + id);
                 }
-                message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
+                message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
                 // send the combined message + forward headers
                 String id = MessagingService.instance().sendRR(message, target, handler);
                 if (logger.isDebugEnabled())
@@ -492,10 +489,9 @@ public class StorageProxy implements StorageProxyMBean
             // Forward the actual update to the chosen leader replica
             IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
 
-            Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
             if (logger.isDebugEnabled())
                 logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
-            MessagingService.instance().sendRR(message, endpoint, responseHandler);
+            MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
             return responseHandler;
         }
     }
@@ -673,7 +669,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     logger.debug("reading data from {}", dataPoint);
-                    MessagingService.instance().sendRR(command, dataPoint, handler);
+                    MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
                 }
 
                 if (handler.endpoints.size() == 1)
@@ -682,7 +678,7 @@ public class StorageProxy implements StorageProxyMBean
                 // send the other endpoints a digest request
                 ReadCommand digestCommand = command.copy();
                 digestCommand.setDigestQuery(true);
-                MessageProducer producer = null;
+                MessageOut message = null;
                 for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
                 {
                     if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
@@ -695,9 +691,9 @@ public class StorageProxy implements StorageProxyMBean
                         logger.debug("reading digest from {}", digestPoint);
                         // (We lazy-construct the digest Message object since it may not be necessary if we
                         // are doing a local digest read, or no digest reads at all.)
-                        if (producer == null)
-                            producer = new CachingMessageProducer(digestCommand);
-                        MessagingService.instance().sendRR(producer, digestPoint, handler);
+                        if (message == null)
+                            message = digestCommand.createMessage();
+                        MessagingService.instance().sendRR(message, digestPoint, handler);
                     }
                 }
             }
@@ -743,9 +739,11 @@ public class StorageProxy implements StorageProxyMBean
                     repairCommands.add(command);
                     repairResponseHandlers.add(repairHandler);
 
-                    MessageProducer producer = new CachingMessageProducer(command);
                     for (InetAddress endpoint : handler.endpoints)
-                        MessagingService.instance().sendRR(producer, endpoint, repairHandler);
+                    {
+                        MessageOut<ReadCommand> message = command.createMessage();
+                        MessagingService.instance().sendRR(message, endpoint, repairHandler);
+                    }
                 }
             }
 
@@ -885,9 +883,10 @@ public class StorageProxy implements StorageProxyMBean
                     ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
                     handler.assureSufficientLiveNodes();
                     resolver.setSources(handler.endpoints);
+                    MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : handler.endpoints)
                     {
-                        MessagingService.instance().sendRR(nodeCmd, endpoint, handler);
+                        MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + nodeCmd + " from " + endpoint);
                     }
@@ -965,14 +964,9 @@ public class StorageProxy implements StorageProxyMBean
             }
         };
         // an empty message acts as a request to the SchemaCheckVerbHandler.
+        MessageOut message = new MessageOut(StorageService.Verb.SCHEMA_CHECK);
         for (InetAddress endpoint : liveHosts)
-        {
-            Message message = new Message(FBUtilities.getBroadcastAddress(),
-                                          StorageService.Verb.SCHEMA_CHECK,
-                                          ArrayUtils.EMPTY_BYTE_ARRAY,
-                                          Gossiper.instance.getVersion(endpoint));
             MessagingService.instance().sendRR(message, endpoint, cb);
-        }
 
         try
         {
@@ -1204,9 +1198,9 @@ public class StorageProxy implements StorageProxyMBean
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         final Truncation truncation = new Truncation(keyspace, cfname);
-        MessageProducer producer = new CachingMessageProducer(truncation);
+        MessageOut<Truncation> message = truncation.createMessage();
         for (InetAddress endpoint : allEndpoints)
-            MessagingService.instance().sendRR(producer, endpoint, responseHandler);
+            MessagingService.instance().sendRR(message, endpoint, responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7d651d4..d8b4d34 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -56,10 +56,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
@@ -1505,13 +1502,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     /**
      * Sends a notification to a node indicating we have finished replicating data.
      *
-     * @param local the local address
      * @param remote node to send notification to
      */
-    private void sendReplicationNotification(InetAddress local, InetAddress remote)
+    private void sendReplicationNotification(InetAddress remote)
     {
         // notify the remote token
-        Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
+        MessageOut msg = new MessageOut(StorageService.Verb.REPLICATION_FINISHED);
         IFailureDetector failureDetector = FailureDetector.instance;
         if (logger.isDebugEnabled())
             logger.debug("Notifying " + remote.toString() + " of replication completion\n");
@@ -1578,7 +1574,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                         {
                             fetchSources.remove(source, table);
                             if (fetchSources.isEmpty())
-                                sendReplicationNotification(myAddress, notifyEndpoint);
+                                sendReplicationNotification(notifyEndpoint);
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
index c4583d9..67e3b66 100644
--- a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.streaming;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 
@@ -33,7 +33,7 @@ public class ReplicationFinishedVerbHandler implements IVerbHandler
     public void doVerb(Message msg, String id)
     {
         StorageService.instance.confirmReplication(msg.getFrom());
-        Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY, msg.getVersion());
+        MessageOut response = new MessageOut(StorageService.Verb.INTERNAL_RESPONSE);
         if (logger.isDebugEnabled())
             logger.debug("Replying to " + id + "@" + msg.getFrom());
         MessagingService.instance().sendReply(response, id, msg.getFrom());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
index 52d536b..8997156 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
-import org.apache.cassandra.gms.Gossiper;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +30,6 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -65,8 +64,7 @@ public class StreamIn
                                                             columnFamilies,
                                                             session.getSessionId(),
                                                             type);
-        Message message = srm.getMessage(Gossiper.instance.getVersion(source));
-        MessagingService.instance().sendOneWay(message, source);
+        MessagingService.instance().sendOneWay(srm.createMessage(), source);
     }
 
     /** Translates remote files to local files by creating a local sstable per remote sstable. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 1e978ab..efeaee8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.utils.Pair;
 
@@ -115,7 +115,7 @@ public class StreamInSession extends AbstractStreamSession
             current = null;
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
         // send a StreamStatus message telling the source node it can delete this file
-        sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+        sendMessage(reply.createMessage());
         logger.debug("ack {} sent for {}", reply, remoteFile);
     }
 
@@ -133,7 +133,7 @@ public class StreamInSession extends AbstractStreamSession
         logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
         try
         {
-            sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+            sendMessage(reply.createMessage());
         }
         catch (IOException e)
         {
@@ -142,9 +142,14 @@ public class StreamInSession extends AbstractStreamSession
         }
     }
 
-    public void sendMessage(Message message) throws IOException
+    public void sendMessage(MessageOut<StreamReply> message) throws IOException
     {
-        OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream()));
+        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
+        OutboundTcpConnection.write(message,
+                                    String.valueOf(getSessionId()),
+                                    out,
+                                    Gossiper.instance.getVersion(getHost()));
+        out.flush();
     }
 
     public void closeIfFinished() throws IOException
@@ -191,7 +196,10 @@ public class StreamInSession extends AbstractStreamSession
             try
             {
                 if (socket != null)
-                    OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(getHost())), context.right.toString(), new DataOutputStream(socket.getOutputStream()));
+                    OutboundTcpConnection.write(reply.createMessage(),
+                                                context.right.toString(),
+                                                new DataOutputStream(socket.getOutputStream()),
+                                                Gossiper.instance.getVersion(getHost()));
                 else
                     logger.debug("No socket to reply to {} with!", getHost());
             }
@@ -210,15 +218,8 @@ public class StreamInSession extends AbstractStreamSession
         sessions.remove(context);
         if (!success && FailureDetector.instance.isAlive(getHost()))
         {
-            try
-            {
-                StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
-                MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
-            }
-            catch (IOException ex)
-            {
-                logger.error("Error sending streaming session failure notification to " + getHost(), ex);
-            }
+            StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
+            MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index b3453da..269d878 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -17,16 +17,15 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
 
-class StreamReply implements MessageProducer
+class StreamReply
 {
     static enum Status
     {
@@ -49,12 +48,9 @@ class StreamReply implements MessageProducer
         this.sessionId = sessionId;
     }
 
-    public Message getMessage(Integer version) throws IOException
+    public MessageOut<StreamReply> createMessage()
     {
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        serializer.serialize(this, dos, version);
-        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
+        return new MessageOut<StreamReply>(StorageService.Verb.STREAM_REPLY, this, serializer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java b/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
index 9c9536e..6367cc5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
@@ -17,9 +17,13 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import com.google.common.collect.Iterables;
 
@@ -29,13 +33,10 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
 * This class encapsulates the message that needs to be sent to nodes
@@ -44,7 +45,7 @@ import org.apache.cassandra.utils.FBUtilities;
 *
 * If a file is specified, ranges and table will not. vice-versa should hold as well.
 */
-class StreamRequestMessage implements MessageProducer
+class StreamRequestMessage // TODO rename to StreamRequest
 {
     private static final IVersionedSerializer<StreamRequestMessage> serializer;
     static
@@ -91,19 +92,9 @@ class StreamRequestMessage implements MessageProducer
         columnFamilies = null;
     }
 
-    public Message getMessage(Integer version)
+    public MessageOut<StreamRequestMessage> createMessage()
     {
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        try
-        {
-            StreamRequestMessage.serializer().serialize(this, dos, version);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
+        return new MessageOut<StreamRequestMessage>(StorageService.Verb.STREAM_REQUEST, this, serializer);
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index d86d2f2..977a8a8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.streaming;
 import java.io.*;
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.*;
@@ -131,15 +130,11 @@ public class StreamingRepairTask implements Runnable
 
     private void forwardToSource()
     {
-        try
-        {
-            logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", id, ranges.size(), src, dst));
-            StreamingRepairRequest.send(this);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("Error forwarding streaming task to " + src, e);
-        }
+        logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", id, ranges.size(), src, dst));
+        MessageOut<StreamingRepairTask> msg = new MessageOut<StreamingRepairTask>(StorageService.Verb.STREAMING_REPAIR_REQUEST,
+                                                                                  this,
+                                                                                  StreamingRepairTask.serializer);
+        MessagingService.instance().sendOneWay(msg, src);
     }
 
     private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
@@ -217,15 +212,6 @@ public class StreamingRepairTask implements Runnable
             task.run();
         }
 
-        private static void send(StreamingRepairTask task) throws IOException
-        {
-            int version = Gossiper.instance.getVersion(task.src);
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream(bos);
-            StreamingRepairTask.serializer.serialize(task, dos, version);
-            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
-            MessagingService.instance().sendOneWay(msg, task.src);
-        }
     }
 
     public static class StreamingRepairResponse implements IVerbHandler
@@ -238,7 +224,7 @@ public class StreamingRepairTask implements Runnable
             UUID taskid;
             try
             {
-                 taskid = UUIDGen.read(dis);
+                taskid = UUIDGen.serializer.deserialize(dis, message.getVersion());
             }
             catch (IOException e)
             {
@@ -262,12 +248,8 @@ public class StreamingRepairTask implements Runnable
         private static void reply(InetAddress remote, UUID taskid) throws IOException
         {
             logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
-            int version = Gossiper.instance.getVersion(remote);
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream(bos);
-            UUIDGen.write(taskid, dos);
-            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
-            MessagingService.instance().sendOneWay(msg, remote);
+            MessageOut<UUID> message = new MessageOut<UUID>(StorageService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDGen.serializer);
+            MessagingService.instance().sendOneWay(message, remote);
         }
     }
 
@@ -275,7 +257,7 @@ public class StreamingRepairTask implements Runnable
     {
         public void serialize(StreamingRepairTask task, DataOutput dos, int version) throws IOException
         {
-            UUIDGen.write(task.id, dos);
+            UUIDGen.serializer.serialize(task.id, dos, version);
             CompactEndpointSerializationHelper.serialize(task.owner, dos);
             CompactEndpointSerializationHelper.serialize(task.src, dos);
             CompactEndpointSerializationHelper.serialize(task.dst, dos);
@@ -291,7 +273,7 @@ public class StreamingRepairTask implements Runnable
 
         public StreamingRepairTask deserialize(DataInput dis, int version) throws IOException
         {
-            UUID id = UUIDGen.read(dis);
+            UUID id = UUIDGen.serializer.deserialize(dis, version);
             InetAddress owner = CompactEndpointSerializationHelper.deserialize(dis);
             InetAddress src = CompactEndpointSerializationHelper.deserialize(dis);
             InetAddress dst = CompactEndpointSerializationHelper.deserialize(dis);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index a303284..9b56d76 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -17,17 +17,20 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.PeekingIterator;
 
+import org.apache.cassandra.db.DBConstants;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessagingService;
 
 /**
  * A MerkleTree implemented as a binary tree.
@@ -79,48 +82,30 @@ public class MerkleTree implements Serializable
     private long size;
     private Hashable root;
 
-    public static class MerkleTreeSerializer
+    public static class MerkleTreeSerializer implements IVersionedSerializer<MerkleTree>
     {
-        public void serialize(MerkleTree mt, DataOutputStream dos, int version) throws IOException
+        public void serialize(MerkleTree mt, DataOutput dos, int version) throws IOException
         {
-            if (version == MessagingService.VERSION_07)
-            {
-                ObjectOutputStream out = new ObjectOutputStream(dos);
-                out.writeObject(mt);
-            }
-            else
-            {
-                dos.writeByte(mt.hashdepth);
-                dos.writeLong(mt.maxsize);
-                dos.writeLong(mt.size);
-                Hashable.serializer.serialize(mt.root, dos, version);
-            }
+            dos.writeByte(mt.hashdepth);
+            dos.writeLong(mt.maxsize);
+            dos.writeLong(mt.size);
+            Hashable.serializer.serialize(mt.root, dos, version);
         }
 
-        public MerkleTree deserialize(DataInputStream dis, int version) throws IOException
+        public MerkleTree deserialize(DataInput dis, int version) throws IOException
         {
-            if (version == MessagingService.VERSION_07)
-            {
-                ObjectInputStream in = new ObjectInputStream(dis);
-                try
-                {
-                    return (MerkleTree)in.readObject();
-                }
-                catch (ClassNotFoundException ex)
-                {
-                    throw new IOException(ex);
-                }
-            }
-            else
-            {
-                byte hashdepth = dis.readByte();
-                long maxsize = dis.readLong();
-                long size = dis.readLong();
-                MerkleTree mt = new MerkleTree(null, null, hashdepth, maxsize);
-                mt.size = size;
-                mt.root = Hashable.serializer.deserialize(dis, version);
-                return mt;
-            }
+            byte hashdepth = dis.readByte();
+            long maxsize = dis.readLong();
+            long size = dis.readLong();
+            MerkleTree mt = new MerkleTree(null, null, hashdepth, maxsize);
+            mt.size = size;
+            mt.root = Hashable.serializer.deserialize(dis, version);
+            return mt;
+        }
+
+        public long serializedSize(MerkleTree merkleTree, int version)
+        {
+            return 1 + DBConstants.LONG_SIZE + DBConstants.LONG_SIZE + Hashable.serializer.serializedSize(merkleTree.root, version);
         }
     }
 
@@ -696,7 +681,7 @@ public class MerkleTree implements Serializable
             return buff.toString();
         }
 
-        private static class InnerSerializer
+        private static class InnerSerializer implements IVersionedSerializer<Inner>
         {
             public void serialize(Inner inner, DataOutput dos, int version) throws IOException
             {
@@ -723,6 +708,15 @@ public class MerkleTree implements Serializable
                 Hashable rchild = Hashable.serializer.deserialize(dis, version);
                 return new Inner(token, lchild, rchild);
             }
+
+            public long serializedSize(Inner inner, int version)
+            {
+                int size = inner.hash == null ? DBConstants.INT_SIZE : DBConstants.INT_SIZE + inner.hash.length;
+                size += Token.serializer().serializedSize(inner.token)
+                        + Hashable.serializer.serializedSize(inner.lchild, version)
+                        + Hashable.serializer.serializedSize(inner.rchild, version);
+                return size;
+            }
         }
     }
 
@@ -770,12 +764,14 @@ public class MerkleTree implements Serializable
             return "#<Leaf " + Hashable.toString(hash()) + ">";
         }
 
-        private static class LeafSerializer
+        private static class LeafSerializer implements IVersionedSerializer<Leaf>
         {
-            public void serialize(Leaf leaf, DataOutput dos) throws IOException
+            public void serialize(Leaf leaf, DataOutput dos, int version) throws IOException
             {
                 if (leaf.hash == null)
+                {
                     dos.writeInt(-1);
+                }
                 else
                 {
                     dos.writeInt(leaf.hash.length);
@@ -783,7 +779,7 @@ public class MerkleTree implements Serializable
                 }
             }
 
-            public Leaf deserialize(DataInput dis) throws IOException
+            public Leaf deserialize(DataInput dis, int version) throws IOException
             {
                 int hashLen = dis.readInt();
                 byte[] hash = hashLen < 0 ? null : new byte[hashLen];
@@ -791,6 +787,11 @@ public class MerkleTree implements Serializable
                     dis.readFully(hash);
                 return new Leaf(hash);
             }
+
+            public long serializedSize(Leaf leaf, int version)
+            {
+                return leaf.hash == null ? DBConstants.INT_SIZE : DBConstants.INT_SIZE + leaf.hash.length;
+            }
         }
     }
 
@@ -819,7 +820,7 @@ public class MerkleTree implements Serializable
     /**
      * Abstract class containing hashing logic, and containing a single hash field.
      */
-    static abstract class Hashable implements Serializable
+    static abstract class Hashable
     {
         private static final long serialVersionUID = 1L;
         private static final IVersionedSerializer<Hashable> serializer = new HashableSerializer();
@@ -893,7 +894,7 @@ public class MerkleTree implements Serializable
                 else if (h instanceof Leaf)
                 {
                     dos.writeByte(Leaf.IDENT);
-                    Leaf.serializer.serialize((Leaf)h, dos);
+                    Leaf.serializer.serialize((Leaf) h, dos, version);
                 }
                 else
                     throw new IOException("Unexpected Hashable: " + h.getClass().getCanonicalName());
@@ -905,14 +906,18 @@ public class MerkleTree implements Serializable
                 if (Inner.IDENT == ident)
                     return Inner.serializer.deserialize(dis, version);
                 else if (Leaf.IDENT == ident)
-                    return Leaf.serializer.deserialize(dis);
+                    return Leaf.serializer.deserialize(dis, version);
                 else
                     throw new IOException("Unexpected Hashable: " + ident);
             }
 
-            public long serializedSize(Hashable hashable, int version)
+            public long serializedSize(Hashable h, int version)
             {
-                throw new UnsupportedOperationException();
+                if (h instanceof Inner)
+                    return 1 + Inner.serializer.serializedSize((Inner) h, version);
+                else if (h instanceof Leaf)
+                    return 1 + Leaf.serializer.serializedSize((Leaf) h, version);
+                throw new AssertionError(h.getClass());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index ff10650..19f2e9b 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -27,11 +27,16 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.io.IVersionedSerializer;
+
 /**
  * The goods are here: www.ietf.org/rfc/rfc4122.txt.
  */
 public class UUIDGen
 {
+    public static UUIDSerializer serializer = new UUIDSerializer();
+
     // A grand day! millis at 00:00:00.000 15 Oct 1582.
     private static final long START_EPOCH = -12219292800000L;
     private static final long clock = new Random(System.currentTimeMillis()).nextLong();
@@ -90,17 +95,23 @@ public class UUIDGen
         return new UUID(raw.getLong(raw.position()), raw.getLong(raw.position() + 8));
     }
 
-    /** reads a uuid from an input stream. */
-    public static UUID read(DataInput dis) throws IOException
+    public static class UUIDSerializer implements IVersionedSerializer<UUID>
     {
-        return new UUID(dis.readLong(), dis.readLong());
-    }
+        public void serialize(UUID uuid, DataOutput out, int version) throws IOException
+        {
+            out.writeLong(uuid.getMostSignificantBits());
+            out.writeLong(uuid.getLeastSignificantBits());
+        }
 
-    /** writes a uuid to an output stream. */
-    public static void write(UUID uuid, DataOutput dos) throws IOException
-    {
-        dos.writeLong(uuid.getMostSignificantBits());
-        dos.writeLong(uuid.getLeastSignificantBits());
+        public UUID deserialize(DataInput in, int version) throws IOException
+        {
+            return new UUID(in.readLong(), in.readLong());
+        }
+
+        public long serializedSize(UUID uuid, int version)
+        {
+            return DBConstants.LONG_SIZE + DBConstants.LONG_SIZE;
+        }
     }
 
     /** decomposes a uuid into raw bytes. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index e29e4ec..cf2c079 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -32,7 +32,7 @@ import java.util.Map;
 
 public class AbstractSerializationsTester extends SchemaLoader
 {
-    protected static final String CUR_VER = System.getProperty("cassandra.version", "0.7");
+    protected static final String CUR_VER = System.getProperty("cassandra.version", "1.0");
     protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
     {{
             put("0.7", 1);


Mime
View raw message