cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [4/5] git commit: ByteBuffer write() methods for serializing sstables
Date Wed, 19 Mar 2014 17:02:32 GMT
ByteBuffer write() methods for serializing sstables

Patch by benedict, reviewed by marcuse for CASSANDRA-6781


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75508ec8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75508ec8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75508ec8

Branch: refs/heads/trunk
Commit: 75508ec8973972634dca2a41a12a0d1d48e7c3ae
Parents: 2a2141c
Author: belliottsmith <github@sub.laerad.com>
Authored: Wed Mar 19 17:56:01 2014 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Wed Mar 19 18:01:12 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |   3 +-
 .../cache/SerializingCacheProvider.java         |   4 +-
 .../apache/cassandra/db/BatchlogManager.java    |  12 +-
 .../cassandra/db/ColumnFamilySerializer.java    |   7 +-
 .../org/apache/cassandra/db/ColumnIndex.java    |   6 +-
 .../apache/cassandra/db/ColumnSerializer.java   |   4 +-
 .../apache/cassandra/db/CounterMutation.java    |   3 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |   4 +-
 .../org/apache/cassandra/db/DeletionTime.java   |   4 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   4 +-
 .../org/apache/cassandra/db/OnDiskAtom.java     |   3 +-
 .../apache/cassandra/db/PagedRangeCommand.java  |   4 +-
 .../apache/cassandra/db/RangeSliceCommand.java  |   4 +-
 .../apache/cassandra/db/RangeSliceReply.java    |   4 +-
 .../org/apache/cassandra/db/RangeTombstone.java |   5 +-
 .../apache/cassandra/db/RangeTombstoneList.java |   4 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   4 +-
 .../org/apache/cassandra/db/ReadResponse.java   |   5 +-
 src/java/org/apache/cassandra/db/Row.java       |   3 +-
 .../org/apache/cassandra/db/RowIndexEntry.java  |   4 +-
 .../org/apache/cassandra/db/RowPosition.java    |   4 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |   3 +-
 .../cassandra/db/SliceFromReadCommand.java      |   4 +-
 .../apache/cassandra/db/SnapshotCommand.java    |   4 +-
 .../apache/cassandra/db/TruncateResponse.java   |   4 +-
 .../org/apache/cassandra/db/Truncation.java     |   4 +-
 .../org/apache/cassandra/db/WriteResponse.java  |   4 +-
 .../cassandra/db/commitlog/CommitLog.java       |   8 +-
 .../cassandra/db/commitlog/ReplayPosition.java  |   4 +-
 .../db/compaction/AbstractCompactedRow.java     |   3 +-
 .../db/compaction/LazilyCompactedRow.java       |   3 +-
 .../cassandra/db/composites/AbstractCType.java  |   4 +-
 .../db/composites/AbstractCellNameType.java     |   4 +-
 .../apache/cassandra/db/filter/ColumnSlice.java |   4 +-
 .../cassandra/db/filter/IDiskAtomFilter.java    |   5 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |   4 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |   4 +-
 .../apache/cassandra/dht/AbstractBounds.java    |   3 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |   4 +-
 src/java/org/apache/cassandra/dht/Token.java    |   4 +-
 .../org/apache/cassandra/gms/EchoMessage.java   |   4 +-
 .../org/apache/cassandra/gms/EndpointState.java |   3 +-
 .../org/apache/cassandra/gms/GossipDigest.java  |   3 +-
 .../apache/cassandra/gms/GossipDigestAck.java   |   4 +-
 .../apache/cassandra/gms/GossipDigestAck2.java  |   3 +-
 .../apache/cassandra/gms/GossipDigestSyn.java   |   5 +-
 .../apache/cassandra/gms/HeartBeatState.java    |   3 +-
 .../apache/cassandra/gms/VersionedValue.java    |   3 +-
 .../apache/cassandra/io/ISSTableSerializer.java |   5 +-
 .../org/apache/cassandra/io/ISerializer.java    |   6 +-
 .../cassandra/io/IVersionedSerializer.java      |   6 +-
 .../io/compress/CompressionMetadata.java        |   3 +-
 .../io/compress/CompressionParameters.java      |   4 +-
 .../cassandra/io/sstable/IndexHelper.java       |   3 +-
 .../cassandra/io/sstable/IndexSummary.java      |   7 +-
 .../cassandra/io/sstable/SSTableReader.java     |   4 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   4 +-
 .../io/sstable/metadata/CompactionMetadata.java |   4 +-
 .../metadata/IMetadataComponentSerializer.java  |   5 +-
 .../sstable/metadata/IMetadataSerializer.java   |   6 +-
 .../metadata/LegacyMetadataSerializer.java      |   3 +-
 .../io/sstable/metadata/MetadataSerializer.java |   6 +-
 .../io/sstable/metadata/StatsMetadata.java      |   4 +-
 .../io/sstable/metadata/ValidationMetadata.java |   4 +-
 .../cassandra/io/util/AbstractDataOutput.java   |  43 +-
 .../io/util/ByteBufferOutputStream.java         |  46 --
 .../io/util/ChecksummedOutputStream.java        |  56 ---
 .../cassandra/io/util/DataOutputBuffer.java     |  23 +-
 .../cassandra/io/util/DataOutputByteBuffer.java |  59 +++
 .../cassandra/io/util/DataOutputPlus.java       |  32 ++
 .../io/util/DataOutputStreamAndChannel.java     |  55 +++
 .../cassandra/io/util/DataOutputStreamPlus.java |  57 +++
 .../io/util/FastByteArrayOutputStream.java      |  11 +
 .../org/apache/cassandra/io/util/Memory.java    |  21 +-
 .../cassandra/io/util/SequentialWriter.java     |  63 ++-
 .../org/apache/cassandra/net/MessageOut.java    |   3 +-
 .../apache/cassandra/net/MessagingService.java  |   3 +-
 .../cassandra/net/OutboundTcpConnection.java    |  15 +-
 .../org/apache/cassandra/repair/NodePair.java   |   4 +-
 .../apache/cassandra/repair/RepairJobDesc.java  |   5 +-
 .../repair/messages/AnticompactionRequest.java  |  12 +-
 .../repair/messages/PrepareMessage.java         |   4 +-
 .../repair/messages/RepairMessage.java          |   4 +-
 .../cassandra/repair/messages/SyncComplete.java |   4 +-
 .../cassandra/repair/messages/SyncRequest.java  |   4 +-
 .../repair/messages/ValidationComplete.java     |   4 +-
 .../repair/messages/ValidationRequest.java      |   4 +-
 .../apache/cassandra/service/CacheService.java  |  10 +-
 .../cassandra/service/MigrationManager.java     |   4 +-
 .../cassandra/service/pager/PagingState.java    |  10 +-
 .../apache/cassandra/service/paxos/Commit.java  |   4 +-
 .../service/paxos/PrepareResponse.java          |   4 +-
 .../cassandra/streaming/ConnectionHandler.java  |  16 +-
 .../cassandra/streaming/StreamRequest.java      |   5 +-
 .../cassandra/streaming/StreamSummary.java      |   4 +-
 .../streaming/compress/CompressionInfo.java     |   3 +-
 .../streaming/messages/CompleteMessage.java     |   4 +-
 .../streaming/messages/FileMessageHeader.java   |   4 +-
 .../streaming/messages/IncomingFileMessage.java |   4 +-
 .../streaming/messages/OutgoingFileMessage.java |  13 +-
 .../streaming/messages/PrepareMessage.java      |  14 +-
 .../streaming/messages/ReceivedMessage.java     |  10 +-
 .../streaming/messages/RetryMessage.java        |  10 +-
 .../messages/SessionFailedMessage.java          |   4 +-
 .../streaming/messages/StreamInitMessage.java   |   4 +-
 .../streaming/messages/StreamMessage.java       |   8 +-
 .../cassandra/thrift/CassandraServer.java       |   2 +-
 .../apache/cassandra/tools/SSTableExport.java   |   3 +-
 .../cassandra/utils/AtomicLongArrayUpdater.java |   2 +-
 .../cassandra/utils/BloomFilterSerializer.java  |   6 +-
 .../cassandra/utils/BooleanSerializer.java      |   3 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |  77 +---
 .../cassandra/utils/EstimatedHistogram.java     |   5 +-
 .../org/apache/cassandra/utils/FBUtilities.java |   2 +-
 .../cassandra/utils/FastByteComparisons.java    | 240 ----------
 .../cassandra/utils/FastByteOperations.java     | 461 +++++++++++++++++++
 .../apache/cassandra/utils/FilterFactory.java   |   4 +-
 .../apache/cassandra/utils/IntervalTree.java    |   6 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 +-
 .../apache/cassandra/utils/PureJavaCrc32.java   |  50 +-
 .../cassandra/utils/StreamingHistogram.java     |   4 +-
 .../apache/cassandra/utils/UUIDSerializer.java  |   4 +-
 .../cassandra/AbstractSerializationsTester.java |  13 +-
 test/unit/org/apache/cassandra/Util.java        |   6 +-
 .../apache/cassandra/db/SerializationsTest.java |  15 +-
 .../cassandra/gms/SerializationsTest.java       |   5 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  11 +-
 .../metadata/MetadataSerializerTest.java        |   3 +-
 .../cassandra/io/util/DataOutputTest.java       | 251 ++++++++++
 .../apache/cassandra/repair/ValidatorTest.java  |   4 +-
 .../cassandra/service/SerializationsTest.java   |   3 +-
 .../apache/cassandra/utils/BloomFilterTest.java |   3 +-
 .../cassandra/utils/ByteBufferUtilTest.java     |   8 +-
 .../cassandra/utils/FastByteOperationsTest.java | 162 +++++++
 .../cassandra/utils/IntervalTreeTest.java       |  14 +-
 .../apache/cassandra/utils/MerkleTreeTest.java  |   3 +-
 .../cassandra/utils/SerializationsTest.java     |   5 +-
 .../cassandra/utils/StreamingHistogramTest.java |   6 +-
 139 files changed, 1618 insertions(+), 715 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8a381ea..037b1b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@
  * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
  * Fix race condition in Batch CLE (CASSANDRA-6860)
  * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
+ * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 Merged from 2.0:
  * Add uuid() function (CASSANDRA-6473)
  * Omit tombstones from schema digests (CASSANDRA-6862)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index f94999e..c0718b6 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.LengthAvailableInputStream;
 import org.apache.cassandra.io.util.SequentialWriter;
@@ -280,7 +281,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
     public interface CacheSerializer<K extends CacheKey, V>
     {
-        void serialize(K key, DataOutput out) throws IOException;
+        void serialize(K key, DataOutputPlus out) throws IOException;
 
         Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index 84c948e..a058872 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.cache;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 
 public class SerializingCacheProvider
@@ -36,7 +36,7 @@ public class SerializingCacheProvider
     // Package protected for tests
     static class RowCacheSerializer implements ISerializer<IRowCacheEntry>
     {
-        public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
+        public void serialize(IRowCacheEntry entry, DataOutputPlus out) throws IOException
         {
             assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
             boolean isSentinel = entry instanceof RowCacheSentinel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 86b7b17..8024769 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
@@ -54,7 +53,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -144,21 +143,20 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     private static ByteBuffer serializeMutations(Collection<Mutation> mutations)
     {
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(bos);
+        DataOutputBuffer buf = new DataOutputBuffer();
 
         try
         {
-            out.writeInt(mutations.size());
+            buf.writeInt(mutations.size());
             for (Mutation mutation : mutations)
-                Mutation.serializer.serialize(mutation, out, VERSION);
+                Mutation.serializer.serialize(mutation, buf, VERSION);
         }
         catch (IOException e)
         {
             throw new AssertionError(); // cannot happen.
         }
 
-        return ByteBuffer.wrap(bos.toByteArray());
+        return buf.asByteBuffer();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 7a1d16d..f139369 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
 
@@ -48,7 +49,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
      * <column count>
      * <columns, serialized individually>
     */
-    public void serialize(ColumnFamily cf, DataOutput out, int version)
+    public void serialize(ColumnFamily cf, DataOutputPlus out, int version)
     {
         try
         {
@@ -140,7 +141,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         return serializedSize(cf, TypeSizes.NATIVE, version);
     }
 
-    public void serializeForSSTable(ColumnFamily cf, DataOutput out)
+    public void serializeForSSTable(ColumnFamily cf, DataOutputPlus out)
     {
         // Column families shouldn't be written directly to disk, use ColumnIndex.Builder instead
         throw new UnsupportedOperationException();
@@ -151,7 +152,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         throw new UnsupportedOperationException();
     }
 
-    public void serializeCfId(UUID cfId, DataOutput out, int version) throws IOException
+    public void serializeCfId(UUID cfId, DataOutputPlus out, int version) throws IOException
     {
         UUIDSerializer.serializer.serialize(cfId, out, version);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index f00f958..ba03f51 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -27,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
@@ -62,7 +62,7 @@ public class ColumnIndex
         private OnDiskAtom firstColumn;
         private OnDiskAtom lastColumn;
         private OnDiskAtom lastBlockClosing;
-        private final DataOutput output;
+        private final DataOutputPlus output;
         private final RangeTombstone.Tracker tombstoneTracker;
         private int atomCount;
         private final ByteBuffer key;
@@ -72,7 +72,7 @@ public class ColumnIndex
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
-                       DataOutput output)
+                       DataOutputPlus output)
         {
             assert cf != null;
             assert key != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 3cfd900..8c22d71 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -26,6 +25,7 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -61,7 +61,7 @@ public class ColumnSerializer implements ISerializer<Cell>
         this.type = type;
     }
 
-    public void serialize(Cell cell, DataOutput out) throws IOException
+    public void serialize(Cell cell, DataOutputPlus out) throws IOException
     {
         assert !cell.name().isEmpty();
         type.cellSerializer().serialize(cell.name(), out);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 41187ac..031c001 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -280,7 +281,7 @@ public class CounterMutation implements IMutation
 
     public static class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
     {
-        public void serialize(CounterMutation cm, DataOutput out, int version) throws IOException
+        public void serialize(CounterMutation cm, DataOutputPlus out, int version) throws IOException
         {
             Mutation.serializer.serialize(cm.mutation, out, version);
             out.writeUTF(cm.consistency.name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 3a74d52..8601bce 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 
@@ -29,6 +28,7 @@ import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ObjectSizes;
 
 /**
@@ -359,7 +359,7 @@ public class DeletionInfo implements IMeasurableMemory
             this.rtlSerializer = new RangeTombstoneList.Serializer(type);
         }
 
-        public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
+        public void serialize(DeletionInfo info, DataOutputPlus out, int version) throws IOException
         {
             DeletionTime.serializer.serialize(info.topLevel, out);
             rtlSerializer.serialize(info.ranges, out, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 39db398..2bfd0fd 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -26,6 +25,7 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 
@@ -115,7 +115,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 
     public static class Serializer implements ISerializer<DeletionTime>
     {
-        public void serialize(DeletionTime delTime, DataOutput out) throws IOException
+        public void serialize(DeletionTime delTime, DataOutputPlus out) throws IOException
         {
             out.writeInt(delTime.localDeletionTime);
             out.writeLong(delTime.markedForDeleteAt);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 3663380..47c990f 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -262,7 +262,7 @@ public class Mutation implements IMutation
 
     public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
-        public void serialize(Mutation mutation, DataOutput out, int version) throws IOException
+        public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
         {
             if (version < MessagingService.VERSION_20)
                 out.writeUTF(mutation.getKeyspaceName());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 0115fd5..d3c96fd 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
 
 public interface OnDiskAtom
@@ -52,7 +53,7 @@ public interface OnDiskAtom
             this.type = type;
         }
 
-        public void serializeForSSTable(OnDiskAtom atom, DataOutput out) throws IOException
+        public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException
         {
             if (atom instanceof Cell)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 1a009d9..5c8f3ba 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -119,7 +119,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
     private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
     {
-        public void serialize(PagedRangeCommand cmd, DataOutput out, int version) throws IOException
+        public void serialize(PagedRangeCommand cmd, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(cmd.keyspace);
             out.writeUTF(cmd.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index b8f67ba..82e892c 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +30,7 @@ import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.pager.Pageable;
@@ -151,7 +151,7 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
 
 class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
 {
-    public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
+    public void serialize(RangeSliceCommand sliceCommand, DataOutputPlus out, int version) throws IOException
     {
         out.writeUTF(sliceCommand.keyspace);
         out.writeUTF(sliceCommand.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
index 10667a0..5964ea8 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -62,7 +62,7 @@ public class RangeSliceReply
 
     private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply>
     {
-        public void serialize(RangeSliceReply rsr, DataOutput out, int version) throws IOException
+        public void serialize(RangeSliceReply rsr, DataOutputPlus out, int version) throws IOException
         {
             out.writeInt(rsr.rows.size());
             for (Row row : rsr.rows)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 097a835..c865b06 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Interval;
 
@@ -127,7 +128,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
          * Returns the total serialized size of said tombstones and write them
          * to {@code out} it if isn't null.
          */
-        public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutput out, OnDiskAtom.Serializer atomSerializer) throws IOException
+        public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException
         {
             long size = 0;
             if (ranges.isEmpty())
@@ -250,7 +251,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
             this.type = type;
         }
 
-        public void serializeForSSTable(RangeTombstone t, DataOutput out) throws IOException
+        public void serializeForSSTable(RangeTombstone t, DataOutputPlus out) throws IOException
         {
             type.serializer().serialize(t.min, out);
             out.writeByte(ColumnSerializer.RANGE_TOMBSTONE_MASK);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 344c098..dd0b9a6 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -31,6 +30,7 @@ import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 
 import org.apache.cassandra.utils.ObjectSizes;
@@ -663,7 +663,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             this.type = type;
         }
 
-        public void serialize(RangeTombstoneList tombstones, DataOutput out, int version) throws IOException
+        public void serialize(RangeTombstoneList tombstones, DataOutputPlus out, int version) throws IOException
         {
             if (tombstones == null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index b6f954e..299693e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -27,6 +26,7 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
@@ -130,7 +130,7 @@ public abstract class ReadCommand implements IReadCommand, Pageable
 
 class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    public void serialize(ReadCommand command, DataOutput out, int version) throws IOException
+    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
     {
         out.writeByte(command.commandType.serializedValue);
         switch (command.commandType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 3fe6ec4..39022a4 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /*
@@ -67,11 +68,11 @@ public class ReadResponse
 
 class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
 {
-    public void serialize(ReadResponse response, DataOutput out, int version) throws IOException
+    public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
     {
         out.writeInt(response.isDigestQuery() ? response.digest().remaining() : 0);
         ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        ByteBufferUtil.write(buffer, out);
+        out.write(buffer);
         out.writeBoolean(response.isDigestQuery());
         if (!response.isDigestQuery())
             Row.serializer.serialize(response.row(), out, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 13e6f67..0685116 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -61,7 +62,7 @@ public class Row
 
     public static class RowSerializer implements IVersionedSerializer<Row>
     {
-        public void serialize(Row row, DataOutput out, int version) throws IOException
+        public void serialize(Row row, DataOutputPlus out, int version) throws IOException
         {
             ByteBufferUtil.writeWithShortLength(row.key.key, out);
             ColumnFamily.serializer.serialize(row.cf, out, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 49dfec0..618cd61 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 
@@ -98,7 +98,7 @@ public class RowIndexEntry implements IMeasurableMemory
             this.type = type;
         }
 
-        public void serialize(RowIndexEntry rie, DataOutput out) throws IOException
+        public void serialize(RowIndexEntry rie, DataOutputPlus out) throws IOException
         {
             out.writeLong(rie.position);
             out.writeInt(rie.promotedSize(type));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
index cb68620..a665d62 100644
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ b/src/java/org/apache/cassandra/db/RowPosition.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -71,7 +71,7 @@ public abstract class RowPosition implements RingPosition<RowPosition>
          * token is recreated on the other side). In the other cases, we then
          * serialize the token.
          */
-        public void serialize(RowPosition pos, DataOutput out) throws IOException
+        public void serialize(RowPosition pos, DataOutputPlus out) throws IOException
         {
             Kind kind = pos.kind();
             out.writeByte(kind.ordinal());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 78531f7..b1829f3 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -74,7 +75,7 @@ public class SliceByNamesReadCommand extends ReadCommand
 
 class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    public void serialize(ReadCommand cmd, DataOutput out, int version) throws IOException
+    public void serialize(ReadCommand cmd, DataOutputPlus out, int version) throws IOException
     {
         SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
         out.writeBoolean(command.isDigestQuery());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index f6ff89a..f06b9dc 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -30,6 +29,7 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.RowDataResolver;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -130,7 +130,7 @@ public class SliceFromReadCommand extends ReadCommand
 
 class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    public void serialize(ReadCommand rm, DataOutput out, int version) throws IOException
+    public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException
     {
         SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
         out.writeBoolean(realRM.isDigestQuery());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index 7bec637..427e9ec 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
@@ -59,7 +59,7 @@ public class SnapshotCommand
 
 class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand>
 {
-    public void serialize(SnapshotCommand snapshot_command, DataOutput out, int version) throws IOException
+    public void serialize(SnapshotCommand snapshot_command, DataOutputPlus out, int version) throws IOException
     {
         out.writeUTF(snapshot_command.keyspace);
         out.writeUTF(snapshot_command.column_family);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/TruncateResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateResponse.java b/src/java/org/apache/cassandra/db/TruncateResponse.java
index eda9955..d8f5ad2 100644
--- a/src/java/org/apache/cassandra/db/TruncateResponse.java
+++ b/src/java/org/apache/cassandra/db/TruncateResponse.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
@@ -51,7 +51,7 @@ public class TruncateResponse
 
     public static class TruncateResponseSerializer implements IVersionedSerializer<TruncateResponse>
     {
-        public void serialize(TruncateResponse tr, DataOutput out, int version) throws IOException
+        public void serialize(TruncateResponse tr, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(tr.keyspace);
             out.writeUTF(tr.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/Truncation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Truncation.java b/src/java/org/apache/cassandra/db/Truncation.java
index dc219d6..88742cd 100644
--- a/src/java/org/apache/cassandra/db/Truncation.java
+++ b/src/java/org/apache/cassandra/db/Truncation.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
@@ -54,7 +54,7 @@ public class Truncation
 
 class TruncationSerializer implements IVersionedSerializer<Truncation>
 {
-    public void serialize(Truncation t, DataOutput out, int version) throws IOException
+    public void serialize(Truncation t, DataOutputPlus out, int version) throws IOException
     {
         out.writeUTF(t.keyspace);
         out.writeUTF(t.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 83b579a..a7b108b 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
@@ -39,7 +39,7 @@ public class WriteResponse
 
     public static class WriteResponseSerializer implements IVersionedSerializer<WriteResponse>
     {
-        public void serialize(WriteResponse wm, DataOutput out, int version) throws IOException
+        public void serialize(WriteResponse wm, DataOutputPlus out, int version) throws IOException
         {
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 9f2b960..6fe7f4e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -32,8 +32,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.ByteBufferOutputStream;
-import org.apache.cassandra.io.util.ChecksummedOutputStream;
+import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.PureJavaCrc32;
@@ -212,14 +211,17 @@ public class CommitLog implements CommitLogMBean
         {
             PureJavaCrc32 checksum = new PureJavaCrc32();
             final ByteBuffer buffer = alloc.getBuffer();
-            DataOutputStream dos = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
+            DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer);
 
             // checksummed length
             dos.writeInt((int) size);
+            checksum.update(buffer, buffer.position() - 4, 4);
             buffer.putLong(checksum.getValue());
 
+            int start = buffer.position();
             // checksummed mutation
             Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
+            checksum.update(buffer, start, (int) size);
             buffer.putLong(checksum.getValue());
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index e45e6e0..31fc28e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
 
@@ -29,6 +28,7 @@ import com.google.common.collect.Ordering;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class ReplayPosition implements Comparable<ReplayPosition>
 {
@@ -124,7 +124,7 @@ public class ReplayPosition implements Comparable<ReplayPosition>
 
     public static class ReplayPositionSerializer implements ISerializer<ReplayPosition>
     {
-        public void serialize(ReplayPosition rp, DataOutput out) throws IOException
+        public void serialize(ReplayPosition rp, DataOutputPlus out) throws IOException
         {
             out.writeLong(rp.segment);
             out.writeInt(rp.position);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 734155e..c49bee5 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies)
@@ -47,7 +48,7 @@ public abstract class AbstractCompactedRow implements Closeable
      *
      * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
-    public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
+    public abstract RowIndexEntry write(long currentPosition, DataOutputPlus out) throws IOException;
 
     /**
      * update @param digest with the data bytes of the row (not including row key or row size).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 53be765..fbad212 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -99,7 +100,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.gcUpdaterFor(key));
     }
 
-    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+    public RowIndexEntry write(long currentPosition, DataOutputPlus out) throws IOException
     {
         assert !closed;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index 054706d..e015379 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.composites;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractCompositeType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
@@ -318,7 +318,7 @@ public abstract class AbstractCType implements CType
             this.type = type;
         }
 
-        public void serialize(Composite c, DataOutput out) throws IOException
+        public void serialize(Composite c, DataOutputPlus out) throws IOException
         {
             ByteBufferUtil.writeWithShortLength(c.toByteBuffer(), out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
index 6bba44a..191750f 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.composites;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.ColumnToCollectionType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AbstractCellNameType extends AbstractCType implements CellNameType
@@ -97,7 +97,7 @@ public abstract class AbstractCellNameType extends AbstractCType implements Cell
         // A trivial wrapped over the composite serializer
         cellSerializer = new ISerializer<CellName>()
         {
-            public void serialize(CellName c, DataOutput out) throws IOException
+            public void serialize(CellName c, DataOutputPlus out) throws IOException
             {
                 serializer().serialize(c, out);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 821e19a..19e18bd 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableMap;
 import java.util.NavigableSet;
 
 import com.google.common.collect.AbstractIterator;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 import org.apache.cassandra.utils.memory.PoolAllocator;
@@ -116,7 +116,7 @@ public class ColumnSlice
             this.type = type;
         }
 
-        public void serialize(ColumnSlice cs, DataOutput out, int version) throws IOException
+        public void serialize(ColumnSlice cs, DataOutputPlus out, int version) throws IOException
         {
             ISerializer<Composite> serializer = type.serializer();
             serializer.serialize(cs.start, out);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 8142304..dddce00 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -18,18 +18,17 @@
 package org.apache.cassandra.db.filter;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 
 /**
@@ -101,7 +100,7 @@ public interface IDiskAtomFilter
             this.type = type;
         }
 
-        public void serialize(IDiskAtomFilter filter, DataOutput out, int version) throws IOException
+        public void serialize(IDiskAtomFilter filter, DataOutputPlus out, int version) throws IOException
         {
             if (filter instanceof SliceQueryFilter)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index b1745c3..d6d1332 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.filter;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 
 public class NamesQueryFilter implements IDiskAtomFilter
@@ -226,7 +226,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
             this.type = type;
         }
 
-        public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException
+        public void serialize(NamesQueryFilter f, DataOutputPlus out, int version) throws IOException
         {
             out.writeInt(f.columns.size());
             ISerializer<CellName> serializer = type.cellSerializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index f5a2603..cd283f0 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 
@@ -39,6 +38,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -403,7 +403,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
             this.type = type;
         }
 
-        public void serialize(SliceQueryFilter f, DataOutput out, int version) throws IOException
+        public void serialize(SliceQueryFilter f, DataOutputPlus out, int version) throws IOException
         {
             out.writeInt(f.slices.length);
             for (ColumnSlice slice : f.slices)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index 849a841..32e0818 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.Pair;
 
 public abstract class AbstractBounds<T extends RingPosition> implements Serializable
@@ -126,7 +127,7 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ
 
     public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>>
     {
-        public void serialize(AbstractBounds<?> range, DataOutput out, int version) throws IOException
+        public void serialize(AbstractBounds<?> range, DataOutputPlus out, int version) throws IOException
         {
             /*
              * The first int tells us if it's a range or bounds (depending on the value) _and_ if it's tokens or keys (depending on the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index b35d222..343748b 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.dht;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
@@ -34,6 +33,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
@@ -136,7 +136,7 @@ public class BootStrapper
     {
         public static final StringSerializer instance = new StringSerializer();
 
-        public void serialize(String s, DataOutput out, int version) throws IOException
+        public void serialize(String s, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(s);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 771f833..0f50d42 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.dht;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -27,6 +26,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -83,7 +83,7 @@ public abstract class Token<T> implements RingPosition<Token<T>>, Serializable
 
     public static class TokenSerializer implements ISerializer<Token>
     {
-        public void serialize(Token token, DataOutput out) throws IOException
+        public void serialize(Token token, DataOutputPlus out) throws IOException
         {
             IPartitioner p = StorageService.getPartitioner();
             ByteBuffer b = p.getTokenFactory().toByteArray(token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/EchoMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EchoMessage.java b/src/java/org/apache/cassandra/gms/EchoMessage.java
index 46b572e..444278f 100644
--- a/src/java/org/apache/cassandra/gms/EchoMessage.java
+++ b/src/java/org/apache/cassandra/gms/EchoMessage.java
@@ -22,10 +22,10 @@ package org.apache.cassandra.gms;
 
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class EchoMessage
 {
@@ -33,7 +33,7 @@ public class EchoMessage
 
     public static class EchoMessageSerializer implements IVersionedSerializer<EchoMessage>
     {
-        public void serialize(EchoMessage t, DataOutput out, int version) throws IOException
+        public void serialize(EchoMessage t, DataOutputPlus out, int version) throws IOException
         {
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 3df9155..1029374 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -121,7 +122,7 @@ public class EndpointState
 
 class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
 {
-    public void serialize(EndpointState epState, DataOutput out, int version) throws IOException
+    public void serialize(EndpointState epState, DataOutputPlus out, int version) throws IOException
     {
         /* serialize the HeartBeatState */
         HeartBeatState hbState = epState.getHeartBeatState();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/GossipDigest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java b/src/java/org/apache/cassandra/gms/GossipDigest.java
index 0191dad..471602e 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigest.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigest.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -79,7 +80,7 @@ public class GossipDigest implements Comparable<GossipDigest>
 
 class GossipDigestSerializer implements IVersionedSerializer<GossipDigest>
 {
-    public void serialize(GossipDigest gDigest, DataOutput out, int version) throws IOException
+    public void serialize(GossipDigest gDigest, DataOutputPlus out, int version) throws IOException
     {
         CompactEndpointSerializationHelper.serialize(gDigest.endpoint, out);
         out.writeInt(gDigest.generation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/GossipDigestAck.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
index c1445dd..e3be9aa 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.gms;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;
@@ -27,6 +26,7 @@ import java.util.Map;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -59,7 +59,7 @@ public class GossipDigestAck
 
 class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck>
 {
-    public void serialize(GossipDigestAck gDigestAckMessage, DataOutput out, int version) throws IOException
+    public void serialize(GossipDigestAck gDigestAckMessage, DataOutputPlus out, int version) throws IOException
     {
         GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, out, version);
         out.writeInt(gDigestAckMessage.epStateMap.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
index c7c81d4..4a6a06e 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -49,7 +50,7 @@ public class GossipDigestAck2
 
 class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck2>
 {
-    public void serialize(GossipDigestAck2 ack2, DataOutput out, int version) throws IOException
+    public void serialize(GossipDigestAck2 ack2, DataOutputPlus out, int version) throws IOException
     {
         out.writeInt(ack2.epStateMap.size());
         for (Map.Entry<InetAddress, EndpointState> entry : ack2.epStateMap.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
index 7c53604..0ad67bd 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * This is the first message that gets sent out as a start of the Gossip protocol in a
@@ -51,7 +52,7 @@ public class GossipDigestSyn
 
 class GossipDigestSerializationHelper
 {
-    static void serialize(List<GossipDigest> gDigestList, DataOutput out, int version) throws IOException
+    static void serialize(List<GossipDigest> gDigestList, DataOutputPlus out, int version) throws IOException
     {
         out.writeInt(gDigestList.size());
         for (GossipDigest gDigest : gDigestList)
@@ -78,7 +79,7 @@ class GossipDigestSerializationHelper
 
 class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn>
 {
-    public void serialize(GossipDigestSyn gDigestSynMessage, DataOutput out, int version) throws IOException
+    public void serialize(GossipDigestSyn gDigestSynMessage, DataOutputPlus out, int version) throws IOException
     {
         out.writeUTF(gDigestSynMessage.clusterId);
         out.writeUTF(gDigestSynMessage.partioner);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/HeartBeatState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index c3b423c..b33ef92 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -21,6 +21,7 @@ import java.io.*;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * HeartBeat State associated with any given endpoint.
@@ -71,7 +72,7 @@ class HeartBeatState
 
 class HeartBeatStateSerializer implements IVersionedSerializer<HeartBeatState>
 {
-    public void serialize(HeartBeatState hbState, DataOutput out, int version) throws IOException
+    public void serialize(HeartBeatState hbState, DataOutputPlus out, int version) throws IOException
     {
         out.writeInt(hbState.getGeneration());
         out.writeInt(hbState.getHeartBeatVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index a7ee047..206a52b 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -252,7 +253,7 @@ public class VersionedValue implements Comparable<VersionedValue>
 
     private static class VersionedValueSerializer implements IVersionedSerializer<VersionedValue>
     {
-        public void serialize(VersionedValue value, DataOutput out, int version) throws IOException
+        public void serialize(VersionedValue value, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(outValue(value, version));
             out.writeInt(value.version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/ISSTableSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISSTableSerializer.java b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
index 5e501ae..20ee352 100644
--- a/src/java/org/apache/cassandra/io/ISSTableSerializer.java
+++ b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
@@ -18,21 +18,22 @@
 package org.apache.cassandra.io;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public interface ISSTableSerializer<T>
 {
     /**
      * Serialize the specified type into the specified DataOutputStream
      * instance in the format suited for SSTables.
+     *
      * @param t type that needs to be serialized
      * @param out DataOutput into which serialization needs to happen.
      * @throws java.io.IOException
      */
-    public void serializeForSSTable(T t, DataOutput out) throws IOException;
+    public void serializeForSSTable(T t, DataOutputPlus out) throws IOException;
 
     /**
      * Deserialize into the specified DataInputStream instance in the format

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/ISerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISerializer.java b/src/java/org/apache/cassandra/io/ISerializer.java
index a72d17d..7e1759c 100644
--- a/src/java/org/apache/cassandra/io/ISerializer.java
+++ b/src/java/org/apache/cassandra/io/ISerializer.java
@@ -18,20 +18,22 @@
 package org.apache.cassandra.io;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public interface ISerializer<T>
 {
     /**
      * Serialize the specified type into the specified DataOutput instance.
+     *
+     *
      * @param t type that needs to be serialized
      * @param out DataOutput into which serialization needs to happen.
      * @throws java.io.IOException
      */
-    public void serialize(T t, DataOutput out) throws IOException;
+    public void serialize(T t, DataOutputPlus out) throws IOException;
 
     /**
      * Deserialize from the specified DataInput instance.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/IVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/IVersionedSerializer.java b/src/java/org/apache/cassandra/io/IVersionedSerializer.java
index dd890b6..46494e1 100644
--- a/src/java/org/apache/cassandra/io/IVersionedSerializer.java
+++ b/src/java/org/apache/cassandra/io/IVersionedSerializer.java
@@ -18,19 +18,21 @@
 package org.apache.cassandra.io;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataOutputPlus;
+
 public interface IVersionedSerializer<T>
 {
     /**
      * Serialize the specified type into the specified DataOutputStream instance.
+     *
      * @param t type that needs to be serialized
      * @param out DataOutput into which serialization needs to happen.
      * @param version protocol version
      * @throws java.io.IOException
      */
-    public void serialize(T t, DataOutput out, int version) throws IOException;
+    public void serialize(T t, DataOutputPlus out, int version) throws IOException;
 
     /**
      * Deserialize into the specified DataInputStream instance.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index e75a7d7..9d7729b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.utils.Pair;
@@ -408,7 +409,7 @@ public class CompressionMetadata
 
     static class ChunkSerializer implements IVersionedSerializer<Chunk>
     {
-        public void serialize(Chunk chunk, DataOutput out, int version) throws IOException
+        public void serialize(Chunk chunk, DataOutputPlus out, int version) throws IOException
         {
             out.writeLong(chunk.offset);
             out.writeInt(chunk.length);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
index 7baaedd..3ad0879 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.compress;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -36,6 +35,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class CompressionParameters
 {
@@ -311,7 +311,7 @@ public class CompressionParameters
 
     static class Serializer implements IVersionedSerializer<CompressionParameters>
     {
-        public void serialize(CompressionParameters parameters, DataOutput out, int version) throws IOException
+        public void serialize(CompressionParameters parameters, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
             out.writeInt(parameters.otherOptions.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 0d6a06e..72a708b 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
@@ -179,7 +180,7 @@ public class IndexHelper
                 this.type = type;
             }
 
-            public void serialize(IndexInfo info, DataOutput out) throws IOException
+            public void serialize(IndexInfo info, DataOutputPlus out) throws IOException
             {
                 type.serializer().serialize(info.firstName, out);
                 type.serializer().serialize(info.lastName, out);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index e32d8db..f87f356 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.Closeable;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -29,8 +28,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.io.util.MemoryOutputStream;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -209,7 +208,7 @@ public class IndexSummary implements Closeable
 
     public static class IndexSummarySerializer
     {
-        public void serialize(IndexSummary t, DataOutputStream out, boolean withSamplingLevel) throws IOException
+        public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
         {
             out.writeInt(t.minIndexInterval);
             out.writeInt(t.summarySize);
@@ -219,7 +218,7 @@ public class IndexSummary implements Closeable
                 out.writeInt(t.samplingLevel);
                 out.writeInt(t.sizeAtFullSampling);
             }
-            FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
+            out.write(t.bytes);
         }
 
         public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException


Mime
View raw message