cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [3/5] cassandra git commit: Switch to DataInputPlus
Date Thu, 02 Jul 2015 08:40:27 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/DataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputPlus.java b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
new file mode 100644
index 0000000..d4e25d6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+/**
+ * Extension to DataInput that provides support for reading varints
+ */
+public interface DataInputPlus extends DataInput
+{
+
+    default long readVInt() throws IOException
+    {
+        return VIntCoding.readVInt(this);
+    }
+
+    /**
+     * Think hard before opting for an unsigned encoding. Is this going to bite someone because some day
+     * they might need to pass in a sentinel value using negative numbers? Is the risk worth it
+     * to save a few bytes?
+     *
+     * Signed, not a fan of unsigned values in protocols and formats
+     */
+    default long readUnsignedVInt() throws IOException
+    {
+        return VIntCoding.readUnsignedVInt(this);
+    }
+
+    public static class ForwardingDataInput implements DataInput
+    {
+        protected final DataInput in;
+
+        public ForwardingDataInput(DataInput in)
+        {
+            this.in = in;
+        }
+
+        @Override
+        public void readFully(byte[] b) throws IOException
+        {
+            in.readFully(b);
+        }
+
+        @Override
+        public void readFully(byte[] b, int off, int len) throws IOException
+        {
+            in.readFully(b, off, len);
+        }
+
+        @Override
+        public int skipBytes(int n) throws IOException
+        {
+            return in.skipBytes(n);
+        }
+
+        @Override
+        public boolean readBoolean() throws IOException
+        {
+            return in.readBoolean();
+        }
+
+        @Override
+        public byte readByte() throws IOException
+        {
+            return in.readByte();
+        }
+
+        @Override
+        public int readUnsignedByte() throws IOException
+        {
+            return in.readUnsignedByte();
+        }
+
+        @Override
+        public short readShort() throws IOException
+        {
+            return in.readShort();
+        }
+
+        @Override
+        public int readUnsignedShort() throws IOException
+        {
+            return in.readUnsignedShort();
+        }
+
+        @Override
+        public char readChar() throws IOException
+        {
+            return in.readChar();
+        }
+
+        @Override
+        public int readInt() throws IOException
+        {
+            return in.readInt();
+        }
+
+        @Override
+        public long readLong() throws IOException
+        {
+            return in.readLong();
+        }
+
+        @Override
+        public float readFloat() throws IOException
+        {
+            return in.readFloat();
+        }
+
+        @Override
+        public double readDouble() throws IOException
+        {
+            return in.readDouble();
+        }
+
+        @Override
+        public String readLine() throws IOException
+        {
+            return in.readLine();
+        }
+
+        @Override
+        public String readUTF() throws IOException
+        {
+            return in.readUTF();
+        }
+    }
+
+    public static class DataInputPlusAdapter extends ForwardingDataInput implements DataInputPlus
+    {
+        public DataInputPlusAdapter(DataInput in)
+        {
+            super(in);
+        }
+    }
+
+    /**
+     * Wrapper around an InputStream that provides no buffering but can decode varints
+     */
+    public class DataInputStreamPlus extends DataInputStream implements DataInputPlus
+    {
+        public DataInputStreamPlus(InputStream is)
+        {
+            super(is);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index f6a3648..b46d23a 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -59,4 +59,113 @@ public interface DataOutputPlus extends DataOutput
     {
         VIntCoding.writeUnsignedVInt(i, this);
     }
+
+
+    public static class ForwardingDataOutput implements DataOutput
+    {
+        protected final DataOutput out;
+
+        public ForwardingDataOutput(DataOutput out)
+        {
+            this.out = out;
+        }
+
+        public void write(byte[] b) throws IOException
+        {
+            out.write(b);
+        }
+
+        public void write(byte[] b, int off, int len) throws IOException
+        {
+            out.write(b, off, len);
+        }
+
+        public void write(int b) throws IOException
+        {
+            out.write(b);
+        }
+
+        public void writeBoolean(boolean v) throws IOException
+        {
+            out.writeBoolean(v);
+        }
+
+        public void writeByte(int v) throws IOException
+        {
+            out.writeByte(v);
+        }
+
+        public void writeBytes(String s) throws IOException
+        {
+            out.writeBytes(s);
+        }
+
+        public void writeChar(int v) throws IOException
+        {
+            out.writeChar(v);
+        }
+
+        public void writeChars(String s) throws IOException
+        {
+            out.writeChars(s);
+        }
+
+        public void writeDouble(double v) throws IOException
+        {
+            out.writeDouble(v);
+        }
+
+        public void writeFloat(float v) throws IOException
+        {
+            out.writeFloat(v);
+        }
+
+        public void writeInt(int v) throws IOException
+        {
+            out.writeInt(v);
+        }
+
+        public void writeLong(long v) throws IOException
+        {
+            out.writeLong(v);
+        }
+
+        public void writeShort(int v) throws IOException
+        {
+            out.writeShort(v);
+        }
+
+        public void writeUTF(String s) throws IOException
+        {
+            out.writeUTF(s);
+        }
+
+    }
+
+    public static class DataOutputPlusAdapter extends ForwardingDataOutput implements DataOutputPlus
+    {
+
+        public DataOutputPlusAdapter(DataOutput out)
+        {
+            super(out);
+        }
+
+        public void write(ByteBuffer buffer) throws IOException
+        {
+            if (buffer.hasArray())
+                out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+            else
+                throw new UnsupportedOperationException("IMPLEMENT ME");
+        }
+
+        public void write(Memory memory, long offset, long length) throws IOException
+        {
+            throw new UnsupportedOperationException("IMPLEMENT ME");
+        }
+
+        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+        {
+            throw new UnsupportedOperationException("IMPLEMENT ME");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index d94075c..b63b750 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -18,11 +18,10 @@
 package org.apache.cassandra.io.util;
 
 import java.io.Closeable;
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public interface FileDataInput extends DataInput, Closeable
+public interface FileDataInput extends DataInputPlus, Closeable
 {
     public String getPath();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
index 4816379..edbf660 100644
--- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.util;
 
 import java.io.Closeable;
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -43,11 +42,35 @@ import com.google.common.base.Preconditions;
  *
  * NIODataInputStream is not thread safe.
  */
-public class NIODataInputStream extends InputStream implements DataInput, Closeable
+public class NIODataInputStream extends InputStream implements DataInputPlus, Closeable
 {
     private final ReadableByteChannel rbc;
     private final ByteBuffer buf;
 
+    /*
+     *  Used when wrapping a fixed buffer of data instead of a channel
+     */
+    private static final ReadableByteChannel emptyReadableByteChannel = new ReadableByteChannel()
+    {
+
+        @Override
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+        }
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException
+        {
+            return -1;
+        }
+
+    };
 
     public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
     {
@@ -59,6 +82,53 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea
         buf.limit(0);
     }
 
+    /**
+     *
+     * @param buf
+     * @param duplicate Whether or not to duplicate the buffer to ensure thread safety
+     */
+    public NIODataInputStream(ByteBuffer buf, boolean duplicate)
+    {
+        Preconditions.checkNotNull(buf);
+        Preconditions.checkArgument(buf.capacity() >= 9, "Buffer size must be large enough to accomadate a varint");
+        if (duplicate)
+            this.buf = buf.duplicate();
+        else
+            this.buf = buf;
+        this.rbc = emptyReadableByteChannel;
+    }
+
+    /*
+     * The decision to duplicate or not really needs to conscious since it a real impact
+     * in terms of thread safety so don't expose this constructor with an implicit default.
+     */
+    private NIODataInputStream(ByteBuffer buf)
+    {
+        this(buf, false);
+    }
+
+    private static ByteBuffer slice(byte buffer[], int offset, int length)
+    {
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        if (offset > 0 || length < buf.capacity())
+        {
+            buf.position(offset);
+            buf.limit(offset + length);
+            buf = buf.slice();
+        }
+        return buf;
+    }
+
+    public NIODataInputStream(byte buffer[], int offset, int length)
+    {
+        this(slice(buffer, offset, length));
+    }
+
+    public NIODataInputStream(byte buffer[])
+    {
+        this(ByteBuffer.wrap(buffer));
+    }
+
     @Override
     public void readFully(byte[] b) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 274e47b..4b66c4e 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -18,15 +18,14 @@
 package org.apache.cassandra.net;
 
 import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -59,7 +58,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             if (version != StreamMessage.CURRENT_VERSION)
                 throw new IOException(String.format("Received stream using protocol version %d (my version %d). Terminating connection", version, MessagingService.current_version));
 
-            DataInput input = new DataInputStream(socket.getInputStream());
+            DataInputPlus input = new DataInputStreamPlus(socket.getInputStream());
             StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);
 
             // The initiator makes two connections, one for incoming and one for outgoing.
@@ -74,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             close();
         }
     }
-    
+
     @Override
     public void close()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index c325717..25f850d 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -36,7 +36,8 @@ import org.apache.cassandra.config.Config;
 import org.xerial.snappy.SnappyInputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.NIODataInputStream;
 
 public class IncomingTcpConnection extends Thread implements Closeable
@@ -106,7 +107,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
             close();
         }
     }
-    
+
     @Override
     public void close()
     {
@@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         // to connect with, the other node will disconnect
         out.writeInt(MessagingService.current_version);
         out.flush();
-        DataInput in = new DataInputStream(socket.getInputStream());
+        DataInputPlus in = new DataInputStreamPlus(socket.getInputStream());
         int maxVersion = in.readInt();
         // outbound side will reconnect if necessary to upgrade version
         assert version <= MessagingService.current_version;
@@ -149,13 +150,13 @@ public class IncomingTcpConnection extends Thread implements Closeable
             logger.debug("Upgrading incoming connection to be compressed");
             if (version < MessagingService.VERSION_21)
             {
-                in = new DataInputStream(new SnappyInputStream(socket.getInputStream()));
+                in = new DataInputStreamPlus(new SnappyInputStream(socket.getInputStream()));
             }
             else
             {
                 LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
                 Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum();
-                in = new DataInputStream(new LZ4BlockInputStream(socket.getInputStream(),
+                in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(),
                                                                  decompressor,
                                                                  checksum));
             }
@@ -172,7 +173,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
     }
 
-    private InetAddress receiveMessage(DataInput input, int version) throws IOException
+    private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException
     {
         int id;
         if (version < MessagingService.VERSION_20)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 10260c2..82f4000 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
@@ -27,10 +26,10 @@ import com.google.common.collect.ImmutableMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class MessageIn<T>
@@ -57,7 +56,7 @@ public class MessageIn<T>
         return new MessageIn<T>(from, payload, parameters, verb, version);
     }
 
-    public static <T2> MessageIn<T2> read(DataInput in, int version, int id) throws IOException
+    public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id) throws IOException
     {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 28038b3..63e583a 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -129,18 +129,18 @@ public class MessageOut<T>
     {
         int size = CompactEndpointSerializationHelper.serializedSize(from);
 
-        size += TypeSizes.NATIVE.sizeof(verb.ordinal());
-        size += TypeSizes.NATIVE.sizeof(parameters.size());
+        size += TypeSizes.sizeof(verb.ordinal());
+        size += TypeSizes.sizeof(parameters.size());
         for (Map.Entry<String, byte[]> entry : parameters.entrySet())
         {
-            size += TypeSizes.NATIVE.sizeof(entry.getKey());
-            size += TypeSizes.NATIVE.sizeof(entry.getValue().length);
+            size += TypeSizes.sizeof(entry.getKey());
+            size += TypeSizes.sizeof(entry.getValue().length);
             size += entry.getValue().length;
         }
 
         long longSize = payloadSize(version);
         assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
-        size += TypeSizes.NATIVE.sizeof((int) longSize);
+        size += TypeSizes.sizeof((int) longSize);
         size += longSize;
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 3f2160f..22bfdbf 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,10 +37,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -57,6 +55,7 @@ import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.ILatencySubscriber;
@@ -257,7 +256,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
 
-        public Object deserialize(DataInput in, int version) throws IOException
+        public Object deserialize(DataInputPlus in, int version) throws IOException
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
index bb6be04..a73c61a 100644
--- a/src/java/org/apache/cassandra/repair/NodePair.java
+++ b/src/java/org/apache/cassandra/repair/NodePair.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.repair;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
@@ -69,7 +69,7 @@ public class NodePair
             CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out);
         }
 
-        public NodePair deserialize(DataInput in, int version) throws IOException
+        public NodePair deserialize(DataInputPlus in, int version) throws IOException
         {
             InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in);
             InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 8382136..1dd67c7 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
 
@@ -28,6 +27,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -104,7 +104,7 @@ public class RepairJobDesc
             AbstractBounds.tokenSerializer.serialize(desc.range, out, version);
         }
 
-        public RepairJobDesc deserialize(DataInput in, int version) throws IOException
+        public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID parentSessionId = null;
             if (version >= MessagingService.VERSION_21)
@@ -124,13 +124,13 @@ public class RepairJobDesc
             int size = 0;
             if (version >= MessagingService.VERSION_21)
             {
-                size += TypeSizes.NATIVE.sizeof(desc.parentSessionId != null);
+                size += TypeSizes.sizeof(desc.parentSessionId != null);
                 if (desc.parentSessionId != null)
                     size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version);
             }
             size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
-            size += TypeSizes.NATIVE.sizeof(desc.keyspace);
-            size += TypeSizes.NATIVE.sizeof(desc.columnFamily);
+            size += TypeSizes.sizeof(desc.keyspace);
+            size += TypeSizes.sizeof(desc.columnFamily);
             size += AbstractBounds.tokenSerializer.serializedSize(desc.range, version);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index b554500..3e47374 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,6 +25,7 @@ import java.util.UUID;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -59,7 +59,7 @@ public class AnticompactionRequest extends RepairMessage
             }
         }
 
-        public AnticompactionRequest deserialize(DataInput in, int version) throws IOException
+        public AnticompactionRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
             int rangeCount = in.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
index 6d702ce..43a8f02 100644
--- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
 
@@ -47,7 +47,7 @@ public class CleanupMessage extends RepairMessage
             UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
         }
 
-        public CleanupMessage deserialize(DataInput in, int version) throws IOException
+        public CleanupMessage deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
             return new CleanupMessage(parentRepairSession);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 37dc07c..cd1b99d 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,6 +26,7 @@ import java.util.UUID;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -67,7 +67,7 @@ public class PrepareMessage extends RepairMessage
             out.writeBoolean(message.isIncremental);
         }
 
-        public PrepareMessage deserialize(DataInput in, int version) throws IOException
+        public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException
         {
             int cfIdCount = in.readInt();
             List<UUID> cfIds = new ArrayList<>(cfIdCount);
@@ -85,15 +85,14 @@ public class PrepareMessage extends RepairMessage
         public long serializedSize(PrepareMessage message, int version)
         {
             long size;
-            TypeSizes sizes = TypeSizes.NATIVE;
-            size = sizes.sizeof(message.cfIds.size());
+            size = TypeSizes.sizeof(message.cfIds.size());
             for (UUID cfId : message.cfIds)
                 size += UUIDSerializer.serializer.serializedSize(cfId, version);
             size += UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
-            size += sizes.sizeof(message.ranges.size());
+            size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> r : message.ranges)
                 size += Range.tokenSerializer.serializedSize(r, version);
-            size += sizes.sizeof(message.isIncremental);
+            size += TypeSizes.sizeof(message.isIncremental);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 6b5226d..55fdb66 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -90,7 +90,7 @@ public abstract class RepairMessage
             message.messageType.serializer.serialize(message, out, version);
         }
 
-        public RepairMessage deserialize(DataInput in, int version) throws IOException
+        public RepairMessage deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte());
             return messageType.serializer.deserialize(in, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
index caccc82..1b15126 100644
--- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -17,9 +17,9 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
 
@@ -39,7 +39,7 @@ public class SnapshotMessage extends RepairMessage
             RepairJobDesc.serializer.serialize(message.desc, out, version);
         }
 
-        public SnapshotMessage deserialize(DataInput in, int version) throws IOException
+        public SnapshotMessage deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             return new SnapshotMessage(desc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index c9548ca..35cf5d4 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -17,11 +17,11 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.NodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -62,7 +62,7 @@ public class SyncComplete extends RepairMessage
             out.writeBoolean(message.success);
         }
 
-        public SyncComplete deserialize(DataInput in, int version) throws IOException
+        public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             NodePair nodes = NodePair.serializer.deserialize(in, version);
@@ -73,7 +73,7 @@ public class SyncComplete extends RepairMessage
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
             size += NodePair.serializer.serializedSize(message.nodes, version);
-            size += TypeSizes.NATIVE.sizeof(message.success);
+            size += TypeSizes.sizeof(message.success);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 68aaf4d..2c9799e 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -28,6 +27,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
@@ -73,7 +73,7 @@ public class SyncRequest extends RepairMessage
             }
         }
 
-        public SyncRequest deserialize(DataInput in, int version) throws IOException
+        public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
@@ -90,7 +90,7 @@ public class SyncRequest extends RepairMessage
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
             size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
-            size += TypeSizes.NATIVE.sizeof(message.ranges.size());
+            size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index 8328979..ef0c4ec 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.utils.MerkleTree;
@@ -64,7 +64,7 @@ public class ValidationComplete extends RepairMessage
                 MerkleTree.serializer.serialize(message.tree, out, version);
         }
 
-        public ValidationComplete deserialize(DataInput in, int version) throws IOException
+        public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             if (in.readBoolean())
@@ -81,7 +81,7 @@ public class ValidationComplete extends RepairMessage
         public long serializedSize(ValidationComplete message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += TypeSizes.NATIVE.sizeof(message.success);
+            size += TypeSizes.sizeof(message.success);
             if (message.success)
                 size += MerkleTree.serializer.serializedSize(message.tree, version);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index 43bcf23..0dfab6a 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
 
@@ -73,7 +73,7 @@ public class ValidationRequest extends RepairMessage
             out.writeInt(message.gcBefore);
         }
 
-        public ValidationRequest deserialize(DataInput dis, int version) throws IOException
+        public ValidationRequest deserialize(DataInputPlus dis, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, version);
             return new ValidationRequest(desc, dis.readInt());
@@ -82,7 +82,7 @@ public class ValidationRequest extends RepairMessage
         public long serializedSize(ValidationRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += TypeSizes.NATIVE.sizeof(message.gcBefore);
+            size += TypeSizes.sizeof(message.gcBefore);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index e82e8a4..6c25793 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
@@ -29,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -37,7 +37,6 @@ import com.google.common.util.concurrent.Futures;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
@@ -52,6 +51,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -362,7 +362,7 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.cellName, out);
         }
 
-        public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
+        public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
             final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
@@ -416,7 +416,7 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.key, out);
         }
 
-        public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
+        public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
             final int rowsToCache = cfs.metadata.getCaching().rowCache.rowsToCache;
@@ -455,7 +455,7 @@ public class CacheService implements CacheServiceMBean
             key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out);
         }
 
-        public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
+        public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
         {
             int keyLength = input.readInt();
             if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 9deb313..d979c02 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -17,21 +17,15 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.*;
-
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -45,6 +39,7 @@ import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -521,7 +516,7 @@ public class MigrationManager
                 Mutation.serializer.serialize(mutation, out, version);
         }
 
-        public Collection<Mutation> deserialize(DataInput in, int version) throws IOException
+        public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException
         {
             int count = in.readInt();
             Collection<Mutation> schema = new ArrayList<>(count);
@@ -534,7 +529,7 @@ public class MigrationManager
 
         public long serializedSize(Collection<Mutation> schema, int version)
         {
-            int size = TypeSizes.NATIVE.sizeof(schema.size());
+            int size = TypeSizes.sizeof(schema.size());
             for (Mutation mutation : schema)
                 size += Mutation.serializer.serializedSize(mutation, version);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 6077166..9a5e619 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.service.paxos;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,20 +8,19 @@ package org.apache.cassandra.service.paxos;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
 
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -164,7 +164,7 @@ public class Commit
             PartitionUpdate.serializer.serialize(commit.update, out, version);
         }
 
-        public Commit deserialize(DataInput in, int version) throws IOException
+        public Commit deserialize(DataInputPlus in, int version) throws IOException
         {
             DecoratedKey key = null;
             if (version < MessagingService.VERSION_30)
@@ -177,15 +177,13 @@ public class Commit
 
         public long serializedSize(Commit commit, int version)
         {
-            TypeSizes sizes = TypeSizes.NATIVE;
-
             int size = 0;
             if (version < MessagingService.VERSION_30)
-                size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey(), sizes);
+                size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey());
 
             return size
                  + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
-                 + PartitionUpdate.serializer.serializedSize(commit.update, version, sizes);
+                 + PartitionUpdate.serializer.serializedSize(commit.update, version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index bf07402..f843b8d 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.service.paxos;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,20 +8,19 @@ package org.apache.cassandra.service.paxos;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
 
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -81,7 +81,7 @@ public class PrepareResponse
             }
         }
 
-        public PrepareResponse deserialize(DataInput in, int version) throws IOException
+        public PrepareResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             boolean success = in.readBoolean();
             Commit inProgress = Commit.serializer.deserialize(in, version);
@@ -101,14 +101,13 @@ public class PrepareResponse
 
         public long serializedSize(PrepareResponse response, int version)
         {
-            TypeSizes sizes = TypeSizes.NATIVE;
-            long size = sizes.sizeof(response.promised)
+            long size = TypeSizes.sizeof(response.promised)
                       + Commit.serializer.serializedSize(response.inProgressCommit, version);
 
             if (version < MessagingService.VERSION_30)
             {
                 size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version);
-                size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version, sizes);
+                size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 66eb220..2876f08 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -25,8 +25,8 @@ import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Throwables;
-
 import com.google.common.collect.UnmodifiableIterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,11 +42,11 @@ import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 
@@ -174,7 +174,7 @@ public class StreamReader
     public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
     {
         private final CFMetaData metadata;
-        private final DataInput in;
+        private final DataInputPlus in;
         private final SerializationHeader header;
         private final SerializationHelper helper;
 
@@ -186,7 +186,7 @@ public class StreamReader
 
         private final CounterFilteredRow counterRow;
 
-        public StreamDeserializer(CFMetaData metadata, DataInput in, Version version, SerializationHeader header)
+        public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header)
         {
             assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
             this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 0fe40cf..93726e7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -28,6 +27,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 
@@ -65,7 +65,7 @@ public class StreamRequest
                 out.writeUTF(cf);
         }
 
-        public StreamRequest deserialize(DataInput in, int version) throws IOException
+        public StreamRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             String keyspace = in.readUTF();
             long repairedAt = in.readLong();
@@ -86,17 +86,17 @@ public class StreamRequest
 
         public long serializedSize(StreamRequest request, int version)
         {
-            int size = TypeSizes.NATIVE.sizeof(request.keyspace);
-            size += TypeSizes.NATIVE.sizeof(request.repairedAt);
-            size += TypeSizes.NATIVE.sizeof(request.ranges.size());
+            int size = TypeSizes.sizeof(request.keyspace);
+            size += TypeSizes.sizeof(request.repairedAt);
+            size += TypeSizes.sizeof(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {
                 size += Token.serializer.serializedSize(range.left, version);
                 size += Token.serializer.serializedSize(range.right, version);
             }
-            size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
+            size += TypeSizes.sizeof(request.columnFamilies.size());
             for (String cf : request.columnFamilies)
-                size += TypeSizes.NATIVE.sizeof(cf);
+                size += TypeSizes.sizeof(cf);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java
index dc332cb..c427283 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSummary.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.UUID;
@@ -26,6 +25,7 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -88,7 +88,7 @@ public class StreamSummary implements Serializable
             out.writeLong(summary.totalSize);
         }
 
-        public StreamSummary deserialize(DataInput in, int version) throws IOException
+        public StreamSummary deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int files = in.readInt();
@@ -99,8 +99,8 @@ public class StreamSummary implements Serializable
         public long serializedSize(StreamSummary summary, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
-            size += TypeSizes.NATIVE.sizeof(summary.files);
-            size += TypeSizes.NATIVE.sizeof(summary.totalSize);
+            size += TypeSizes.sizeof(summary.files);
+            size += TypeSizes.sizeof(summary.totalSize);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
index 907a1c7..924a656 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -61,7 +61,7 @@ public class CompressionInfo
             CompressionParameters.serializer.serialize(info.parameters, out, version);
         }
 
-        public CompressionInfo deserialize(DataInput in, int version) throws IOException
+        public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException
         {
             // chunks
             int chunkCount = in.readInt();
@@ -80,11 +80,11 @@ public class CompressionInfo
         public long serializedSize(CompressionInfo info, int version)
         {
             if (info == null)
-                return TypeSizes.NATIVE.sizeof(-1);
+                return TypeSizes.sizeof(-1);
 
             // chunks
             int chunkCount = info.chunks.length;
-            long size = TypeSizes.NATIVE.sizeof(chunkCount);
+            long size = TypeSizes.sizeof(chunkCount);
             for (int i = 0; i < chunkCount; i++)
                 size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
             // compression params

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index b8e7979..04d65d7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.net.MessagingService;
@@ -165,7 +165,7 @@ public class FileMessageHeader
                 SerializationHeader.serializer.serialize(header.header, out);
         }
 
-        public FileMessageHeader deserialize(DataInput in, int version) throws IOException
+        public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int sequenceNumber = in.readInt();
@@ -193,22 +193,22 @@ public class FileMessageHeader
         public long serializedSize(FileMessageHeader header, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
-            size += TypeSizes.NATIVE.sizeof(header.sequenceNumber);
-            size += TypeSizes.NATIVE.sizeof(header.version.toString());
+            size += TypeSizes.sizeof(header.sequenceNumber);
+            size += TypeSizes.sizeof(header.version.toString());
 
             if (version >= StreamMessage.VERSION_22)
-                size += TypeSizes.NATIVE.sizeof(header.format.name);
+                size += TypeSizes.sizeof(header.format.name);
 
-            size += TypeSizes.NATIVE.sizeof(header.estimatedKeys);
+            size += TypeSizes.sizeof(header.estimatedKeys);
 
-            size += TypeSizes.NATIVE.sizeof(header.sections.size());
+            size += TypeSizes.sizeof(header.sections.size());
             for (Pair<Long, Long> section : header.sections)
             {
-                size += TypeSizes.NATIVE.sizeof(section.left);
-                size += TypeSizes.NATIVE.sizeof(section.right);
+                size += TypeSizes.sizeof(section.left);
+                size += TypeSizes.sizeof(section.right);
             }
             size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
-            size += TypeSizes.NATIVE.sizeof(header.sstableLevel);
+            size += TypeSizes.sizeof(header.sstableLevel);
 
             if (version >= StreamMessage.VERSION_30)
                 size += SerializationHeader.serializer.serializedSize(header.header);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index fdfb32e..bce9691 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -23,6 +23,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
@@ -39,7 +41,7 @@ public class IncomingFileMessage extends StreamMessage
         @SuppressWarnings("resource")
         public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
         {
-            DataInputStream input = new DataInputStream(Channels.newInputStream(in));
+            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
             StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session)
                     : new CompressedStreamReader(header, session);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 004df18..7ce1a2a 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamRequest;
 import org.apache.cassandra.streaming.StreamSession;
@@ -34,7 +36,7 @@ public class PrepareMessage extends StreamMessage
     {
         public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
         {
-            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             PrepareMessage message = new PrepareMessage();
             // requests
             int numRequests = input.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 1255947..71c8c2a 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -22,6 +22,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
@@ -33,7 +35,7 @@ public class ReceivedMessage extends StreamMessage
     {
         public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
         {
-            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 29e84bf..fa9c30f 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -22,6 +22,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
@@ -33,7 +35,7 @@ public class RetryMessage extends StreamMessage
     {
         public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
         {
-            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index e8b3f82..6d807e9 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -25,6 +24,7 @@ import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -116,7 +116,7 @@ public class StreamInitMessage
             out.writeBoolean(message.isIncremental);
         }
 
-        public StreamInitMessage deserialize(DataInput in, int version) throws IOException
+        public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException
         {
             InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
             int sessionIndex = in.readInt();
@@ -131,12 +131,12 @@ public class StreamInitMessage
         public long serializedSize(StreamInitMessage message, int version)
         {
             long size = CompactEndpointSerializationHelper.serializedSize(message.from);
-            size += TypeSizes.NATIVE.sizeof(message.sessionIndex);
+            size += TypeSizes.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
-            size += TypeSizes.NATIVE.sizeof(message.description);
-            size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
-            size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
-            size += TypeSizes.NATIVE.sizeof(message.isIncremental);
+            size += TypeSizes.sizeof(message.description);
+            size += TypeSizes.sizeof(message.isForOutgoing);
+            size += TypeSizes.sizeof(message.keepSSTableLevel);
+            size += TypeSizes.sizeof(message.isIncremental);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9de202c..dbd489f 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.obs.IBitSet;
-import org.apache.cassandra.db.TypeSizes;
 
 public class BloomFilter extends WrappedSharedCloseable implements IFilter
 {
@@ -54,7 +53,7 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter
 
     public long serializedSize()
     {
-        return serializer.serializedSize(this, TypeSizes.NATIVE);
+        return serializer.serializedSize(this);
     }
 
     // Murmur is faster than an SHA-based approach and provides as-good collision

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 6f80ac0..00bb153 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.obs.IBitSet;
 import org.apache.cassandra.utils.obs.OffHeapBitSet;
@@ -35,7 +36,7 @@ class BloomFilterSerializer implements ISerializer<BloomFilter>
         bf.bitset.serialize(out);
     }
 
-    public BloomFilter deserialize(DataInput in) throws IOException
+    public BloomFilter deserialize(DataInputPlus in) throws IOException
     {
         return deserialize(in, false);
     }
@@ -55,16 +56,15 @@ class BloomFilterSerializer implements ISerializer<BloomFilter>
 
     /**
      * Calculates a serialized size of the given Bloom Filter
-     * @see org.apache.cassandra.io.ISerializer#serialize(Object, org.apache.cassandra.io.util.DataOutputPlus)
-     *
      * @param bf Bloom filter to calculate serialized size
+     * @see org.apache.cassandra.io.ISerializer#serialize(Object, org.apache.cassandra.io.util.DataOutputPlus)
      *
      * @return serialized size of the given bloom filter
      */
-    public long serializedSize(BloomFilter bf, TypeSizes typeSizes)
+    public long serializedSize(BloomFilter bf)
     {
-        int size = typeSizes.sizeof(bf.hashCount); // hash count
-        size += bf.bitset.serializedSize(typeSizes);
+        int size = TypeSizes.sizeof(bf.hashCount); // hash count
+        size += bf.bitset.serializedSize();
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BooleanSerializer.java b/src/java/org/apache/cassandra/utils/BooleanSerializer.java
index 8f3abde..1fe7702 100644
--- a/src/java/org/apache/cassandra/utils/BooleanSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BooleanSerializer.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class BooleanSerializer implements IVersionedSerializer<Boolean>
@@ -32,7 +32,7 @@ public class BooleanSerializer implements IVersionedSerializer<Boolean>
         out.writeBoolean(b);
     }
 
-    public Boolean deserialize(DataInput in, int version) throws IOException
+    public Boolean deserialize(DataInputPlus in, int version) throws IOException
     {
         return in.readBoolean();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 69915bf..65ed23c 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -323,10 +323,10 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, length);
     }
 
-    public static int serializedSizeWithLength(ByteBuffer buffer, TypeSizes sizes)
+    public static int serializedSizeWithLength(ByteBuffer buffer)
     {
         int size = buffer.remaining();
-        return sizes.sizeof(size) + size;
+        return TypeSizes.sizeof(size) + size;
     }
 
     /* @return An unsigned short in an integer. */
@@ -345,10 +345,10 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, readShortLength(in));
     }
 
-    public static int serializedSizeWithShortLength(ByteBuffer buffer, TypeSizes sizes)
+    public static int serializedSizeWithShortLength(ByteBuffer buffer)
     {
         int size = buffer.remaining();
-        return sizes.sizeof((short)size) + size;
+        return TypeSizes.sizeof((short)size) + size;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BytesReadTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BytesReadTracker.java b/src/java/org/apache/cassandra/utils/BytesReadTracker.java
index f363513..0da51a5 100644
--- a/src/java/org/apache/cassandra/utils/BytesReadTracker.java
+++ b/src/java/org/apache/cassandra/utils/BytesReadTracker.java
@@ -21,10 +21,12 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataInputPlus;
+
 /**
  * This class is to track bytes read from given DataInput
  */
-public class BytesReadTracker implements DataInput
+public class BytesReadTracker implements DataInputPlus
 {
 
     private long bytesRead;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index a5c51c8..89fbe4e 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLongArray;
@@ -26,8 +25,8 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-
 import org.slf4j.Logger;
 
 public class EstimatedHistogram
@@ -330,7 +329,7 @@ public class EstimatedHistogram
             }
         }
 
-        public EstimatedHistogram deserialize(DataInput in) throws IOException
+        public EstimatedHistogram deserialize(DataInputPlus in) throws IOException
         {
             int size = in.readInt();
             long[] offsets = new long[size - 1];
@@ -343,17 +342,17 @@ public class EstimatedHistogram
             return new EstimatedHistogram(offsets, buckets);
         }
 
-        public long serializedSize(EstimatedHistogram eh, TypeSizes typeSizes)
+        public long serializedSize(EstimatedHistogram eh)
         {
             int size = 0;
 
             long[] offsets = eh.getBucketOffsets();
             long[] buckets = eh.getBuckets(false);
-            size += typeSizes.sizeof(buckets.length);
+            size += TypeSizes.sizeof(buckets.length);
             for (int i = 0; i < buckets.length; i++)
             {
-                size += typeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]);
-                size += typeSizes.sizeof(buckets[i]);
+                size += TypeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]);
+                size += TypeSizes.sizeof(buckets[i]);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/IntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java b/src/java/org/apache/cassandra/utils/IntervalTree.java
index e857ee7..f40e9a3 100644
--- a/src/java/org/apache/cassandra/utils/IntervalTree.java
+++ b/src/java/org/apache/cassandra/utils/IntervalTree.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -26,12 +25,13 @@ import java.util.*;
 import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AsymmetricOrdering.Op;
 
@@ -329,12 +329,12 @@ public class IntervalTree<C extends Comparable<? super C>, D, I extends Interval
          * tree is to use a custom comparator, as the comparator is *not*
          * serialized.
          */
-        public IntervalTree<C, D, I> deserialize(DataInput in, int version) throws IOException
+        public IntervalTree<C, D, I> deserialize(DataInputPlus in, int version) throws IOException
         {
             return deserialize(in, version, null);
         }
 
-        public IntervalTree<C, D, I> deserialize(DataInput in, int version, Comparator<C> comparator) throws IOException
+        public IntervalTree<C, D, I> deserialize(DataInputPlus in, int version, Comparator<C> comparator) throws IOException
         {
             try
             {
@@ -355,21 +355,16 @@ public class IntervalTree<C extends Comparable<? super C>, D, I extends Interval
             }
         }
 
-        public long serializedSize(IntervalTree<C, D, I> it, TypeSizes typeSizes, int version)
+        public long serializedSize(IntervalTree<C, D, I> it, int version)
         {
-            long size = typeSizes.sizeof(0);
+            long size = TypeSizes.sizeof(0);
             for (Interval<C, D> interval : it)
             {
-                size += pointSerializer.serializedSize(interval.min, typeSizes);
-                size += pointSerializer.serializedSize(interval.max, typeSizes);
-                size += dataSerializer.serializedSize(interval.data, typeSizes);
+                size += pointSerializer.serializedSize(interval.min);
+                size += pointSerializer.serializedSize(interval.max);
+                size += dataSerializer.serializedSize(interval.data);
             }
             return size;
         }
-
-        public long serializedSize(IntervalTree<C, D, I> it, int version)
-        {
-            return serializedSize(it, TypeSizes.NATIVE, version);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..3840622 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -91,7 +92,7 @@ public class MerkleTree implements Serializable
             Hashable.serializer.serialize(mt.root, out, version);
         }
 
-        public MerkleTree deserialize(DataInput in, int version) throws IOException
+        public MerkleTree deserialize(DataInputPlus in, int version) throws IOException
         {
             byte hashdepth = in.readByte();
             long maxsize = in.readLong();
@@ -120,9 +121,9 @@ public class MerkleTree implements Serializable
         public long serializedSize(MerkleTree mt, int version)
         {
             long size = 1 // mt.hashdepth
-                 + TypeSizes.NATIVE.sizeof(mt.maxsize)
-                 + TypeSizes.NATIVE.sizeof(mt.size)
-                 + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName());
+                 + TypeSizes.sizeof(mt.maxsize)
+                 + TypeSizes.sizeof(mt.size)
+                 + TypeSizes.sizeof(mt.partitioner.getClass().getCanonicalName());
 
             // full range
             size += Token.serializer.serializedSize(mt.fullRange.left, version);
@@ -843,8 +844,8 @@ public class MerkleTree implements Serializable
             public long serializedSize(Inner inner, int version)
             {
                 int size = inner.hash == null
-                ? TypeSizes.NATIVE.sizeof(-1)
-                        : TypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length;
+                ? TypeSizes.sizeof(-1)
+                        : TypeSizes.sizeof(inner.hash().length) + inner.hash().length;
 
                 size += Token.serializer.serializedSize(inner.token, version)
                 + Hashable.serializer.serializedSize(inner.lchild, version)
@@ -920,8 +921,8 @@ public class MerkleTree implements Serializable
             public long serializedSize(Leaf leaf, int version)
             {
                 return leaf.hash == null
-                     ? TypeSizes.NATIVE.sizeof(-1)
-                     : TypeSizes.NATIVE.sizeof(leaf.hash().length) + leaf.hash().length;
+                     ? TypeSizes.sizeof(-1)
+                     : TypeSizes.sizeof(leaf.hash().length) + leaf.hash().length;
             }
         }
     }


Mime
View raw message