cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [3/3] cassandra git commit: Constrain internode message buffer sizes, and improve IO class hierarchy
Date Tue, 31 Mar 2015 16:28:48 GMT
Constrain internode message buffer sizes, and improve IO class hierarchy

patch by ariel and benedict for CASSANDRA-8670


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/16499ca9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/16499ca9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/16499ca9

Branch: refs/heads/trunk
Commit: 16499ca9b0080ea4d3c4ed3bc55c753bacc3c24e
Parents: dbe909e
Author: Ariel Weisberg <ariel.wesiberg@datastax.com>
Authored: Tue Mar 31 17:28:15 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Mar 31 17:28:15 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |   4 +-
 .../org/apache/cassandra/cache/OHCProvider.java |   8 +
 .../apache/cassandra/db/BatchlogManager.java    |   2 +-
 .../org/apache/cassandra/db/SuperColumns.java   |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../io/compress/CompressedSequentialWriter.java |   5 +-
 .../io/sstable/IndexSummaryBuilder.java         |   5 +-
 .../io/sstable/format/SSTableReader.java        |   5 +-
 .../io/sstable/format/big/BigTableWriter.java   |  15 +-
 .../io/sstable/metadata/MetadataSerializer.java |   7 +-
 .../cassandra/io/util/AbstractDataOutput.java   | 329 ---------
 .../io/util/BufferedDataOutputStreamPlus.java   | 301 +++++++++
 .../cassandra/io/util/ByteBufferDataInput.java  |   1 -
 .../cassandra/io/util/DataOutputBuffer.java     |  95 ++-
 .../cassandra/io/util/DataOutputByteBuffer.java |  59 --
 .../cassandra/io/util/DataOutputPlus.java       |  14 +-
 .../io/util/DataOutputStreamAndChannel.java     |  55 --
 .../cassandra/io/util/DataOutputStreamPlus.java | 111 ++-
 .../io/util/FastByteArrayOutputStream.java      | 266 --------
 .../org/apache/cassandra/io/util/Memory.java    |   1 +
 .../cassandra/io/util/NIODataInputStream.java   | 312 +++++++++
 .../cassandra/io/util/SafeMemoryWriter.java     | 117 +---
 .../cassandra/io/util/SequentialWriter.java     |   3 +-
 .../io/util/UnbufferedDataOutputStreamPlus.java | 374 +++++++++++
 .../io/util/WrappedDataOutputStreamPlus.java    |  68 ++
 .../cassandra/net/IncomingTcpConnection.java    |   9 +-
 .../cassandra/net/OutboundTcpConnection.java    |  10 +-
 .../apache/cassandra/service/GCInspector.java   |  46 +-
 .../cassandra/service/pager/PagingState.java    |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |  22 +-
 .../cassandra/streaming/StreamWriter.java       |  11 +-
 .../compress/CompressedStreamWriter.java        |  28 +-
 .../streaming/messages/CompleteMessage.java     |   4 +-
 .../streaming/messages/IncomingFileMessage.java |   4 +-
 .../streaming/messages/OutgoingFileMessage.java |   7 +-
 .../streaming/messages/PrepareMessage.java      |   4 +-
 .../streaming/messages/ReceivedMessage.java     |   4 +-
 .../streaming/messages/RetryMessage.java        |   4 +-
 .../messages/SessionFailedMessage.java          |   4 +-
 .../streaming/messages/StreamMessage.java       |   6 +-
 .../cassandra/thrift/CassandraServer.java       |   4 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   2 +-
 .../org/apache/cassandra/tools/NodeTool.java    |   4 +-
 .../org/apache/cassandra/transport/CBUtil.java  |   4 +-
 .../cassandra/utils/memory/MemoryUtil.java      |  47 ++
 .../utils/vint/EncodedDataOutputStream.java     |   4 +-
 .../cassandra/AbstractSerializationsTester.java |   8 +-
 .../apache/cassandra/db/SerializationsTest.java |  19 +-
 .../cassandra/gms/SerializationsTest.java       |   6 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |   4 +
 .../metadata/MetadataSerializerTest.java        |   7 +-
 .../io/util/BufferedDataOutputStreamTest.java   | 391 +++++++++++
 .../cassandra/io/util/DataOutputTest.java       |  50 +-
 .../io/util/NIODataInputStreamTest.java         | 667 +++++++++++++++++++
 .../cassandra/service/SerializationsTest.java   |   5 +-
 .../apache/cassandra/utils/BloomFilterTest.java |  11 +-
 .../cassandra/utils/SerializationsTest.java     |   6 +-
 59 files changed, 2605 insertions(+), 965 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index beb05ab..22bdc5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) 
  * New tool added to validate all sstables in a node (CASSANDRA-5791)
  * Push notification when tracing completes for an operation (CASSANDRA-7807)
  * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7c7e06a..7a9c3da 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -259,7 +259,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         try
                         {
                             stream = streamFactory.getOutputStream(writerPath);
-                            writer = new DataOutputStreamPlus(stream);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
@@ -334,7 +334,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!file.isFile())
                         continue; // someone's been messing with our directory.  naughty!
 
-                    if (file.getName().endsWith(cacheNameFormat) 
+                    if (file.getName().endsWith(cacheNameFormat)
                      || file.getName().endsWith(cacheType.toString()))
                     {
                         if (!file.delete())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 720121c..95c323a 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -21,9 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.Iterator;
 import java.util.UUID;
 
+import com.google.common.base.Function;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.TypeSizes;
@@ -270,5 +273,10 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
         {
             throw new UnsupportedOperationException("IMPLEMENT ME");
         }
+
+        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+        {
+            throw new UnsupportedOperationException("IMPLEMENT ME");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 8eaea52..f5137fd 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -161,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             throw new AssertionError(); // cannot happen.
         }
 
-        return buf.asByteBuffer();
+        return buf.buffer();
     }
 
     private void replayAllFailedBatches() throws ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 2006cbd..65e153f 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -186,7 +186,7 @@ public class SuperColumns
             {
                 // Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName.
                 // So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly.
-                // This is why we call toByteBuffer() and rebuild a  Composite of the right type before call slice().
+                // This is why we call buffer() and rebuild a  Composite of the right type before call slice().
                 slices[i++] = type.make(name.toByteBuffer()).slice();
             }
             return new SliceQueryFilter(slices, false, slices.length, 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9fa3c6b..af18b20 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1008,7 +1008,7 @@ public final class SystemKeyspace
         {
             DataOutputBuffer out = new DataOutputBuffer();
             Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30);
-            return out.asByteBuffer();
+            return out.buffer();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 664e38e..7fa7575 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.DataOutputByteBuffer;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -251,7 +251,7 @@ public class CommitLog implements CommitLogMBean
         {
             ICRC32 checksum = CRC32Factory.instance.create();
             final ByteBuffer buffer = alloc.getBuffer();
-            DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer);
+            BufferedDataOutputStreamPlus dos = new BufferedDataOutputStreamPlus(null, buffer);
 
             // checksummed length
             dos.writeInt((int) size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index fc679d5..eb9dcf8 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.io.compress;
 
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.util.zip.Adler32;
 
 import org.apache.cassandra.io.FSReadError;
@@ -29,7 +31,6 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.FBUtilities;
@@ -79,7 +80,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel));
+        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 696bbf8..c7c51e5 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.IOException;
 import java.nio.ByteOrder;
 import java.util.Map;
 import java.util.TreeMap;
@@ -151,7 +152,7 @@ public class IndexSummaryBuilder implements AutoCloseable
         return lastReadableBoundary;
     }
 
-    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart)
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) throws IOException
     {
         return maybeAddEntry(decoratedKey, indexStart, 0, 0);
     }
@@ -164,7 +165,7 @@ public class IndexSummaryBuilder implements AutoCloseable
      * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record
      *                a value of 0 indicates we are not tracking readable boundaries
      */
-    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd)
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) throws IOException
     {
         if (keysWritten == nextSamplePosition)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index a27adf6..f6cd9b5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -863,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (summariesFile.exists())
             FileUtils.deleteWithConfirm(summariesFile);
 
-        DataOutputStreamAndChannel oStream = null;
+        DataOutputStreamPlus oStream = null;
         try
         {
-            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+            oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));
             IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
             ByteBufferUtil.writeWithLength(first.getKey(), oStream);
             ByteBufferUtil.writeWithLength(last.getKey(), oStream);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 4a981ce..88cb067 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -107,7 +106,7 @@ public class BigTableWriter extends SSTableWriter
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index)
+    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException
     {
         metadataCollector.addKey(decoratedKey.getKey());
         lastWrittenKey = decoratedKey;
@@ -134,15 +133,15 @@ public class BigTableWriter extends SSTableWriter
             entry = row.write(startPosition, dataFile);
             if (entry == null)
                 return null;
+            long endPosition = dataFile.getFilePointer();
+            metadataCollector.update(endPosition - startPosition, row.columnStats());
+            afterAppend(row.key, endPosition, entry);
+            return entry;
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
-        long endPosition = dataFile.getFilePointer();
-        metadataCollector.update(endPosition - startPosition, row.columnStats());
-        afterAppend(row.key, endPosition, entry);
-        return entry;
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -504,7 +503,7 @@ public class BigTableWriter extends SSTableWriter
             return summary.getLastReadableBoundary();
         }
 
-        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
+        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException
         {
             bf.add(key);
             long indexStart = indexFile.getFilePointer();
@@ -545,7 +544,7 @@ public class BigTableWriter extends SSTableWriter
                 {
                     // bloom filter
                     FileOutputStream fos = new FileOutputStream(path);
-                    DataOutputStreamPlus stream = new DataOutputStreamPlus(new BufferedOutputStream(fos));
+                    DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos);
                     FilterFactory.serialize(bf, stream);
                     stream.flush();
                     fos.getFD().sync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 0dcd981..2be69ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -22,15 +22,16 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -148,7 +149,7 @@ public class MetadataSerializer implements IMetadataSerializer
     {
         Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
 
-        try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
+        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
         {
             serialize(currentComponents, out);
             out.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
deleted file mode 100644
index 8f4bed8..0000000
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public abstract class AbstractDataOutput extends OutputStream implements DataOutputPlus
-{
-    /*
-    !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
-    */
-
-    /**
-     * Writes the entire contents of the byte array <code>buffer</code> to
-     * this RandomAccessFile starting at the current file pointer.
-     * 
-     * @param buffer
-     *            the buffer to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs trying to write to this RandomAccessFile.
-     */
-    public void write(byte[] buffer) throws IOException {
-        write(buffer, 0, buffer.length);
-    }
-
-    /**
-     * Writes <code>count</code> bytes from the byte array <code>buffer</code>
-     * starting at <code>offset</code> to this RandomAccessFile starting at
-     * the current file pointer..
-     * 
-     * @param buffer
-     *            the bytes to be written
-     * @param offset
-     *            offset in buffer to get bytes
-     * @param count
-     *            number of bytes in buffer to write
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             RandomAccessFile.
-     * @throws IndexOutOfBoundsException
-     *             If offset or count are outside of bounds.
-     */
-    public abstract void write(byte[] buffer, int offset, int count) throws IOException;
-
-    /**
-     * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
-     * starting at the current file pointer. Only the low order byte of
-     * <code>oneByte</code> is written.
-     * 
-     * @param oneByte
-     *            the byte to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             RandomAccessFile.
-     */
-    public abstract void write(int oneByte) throws IOException;
-
-    /**
-     * Writes a boolean to this output stream.
-     * 
-     * @param val
-     *            the boolean value to write to the OutputStream
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeBoolean(boolean val) throws IOException {
-        write(val ? 1 : 0);
-    }
-
-    /**
-     * Writes a 8-bit byte to this output stream.
-     * 
-     * @param val
-     *            the byte value to write to the OutputStream
-     * 
-     * @throws java.io.IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeByte(int val) throws IOException {
-        write(val & 0xFF);
-    }
-
-    /**
-     * Writes the low order 8-bit bytes from a String to this output stream.
-     * 
-     * @param str
-     *            the String containing the bytes to write to the OutputStream
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeBytes(String str) throws IOException {
-        byte bytes[] = new byte[str.length()];
-        for (int index = 0; index < str.length(); index++) {
-            bytes[index] = (byte) (str.charAt(index) & 0xFF);
-        }
-        write(bytes);
-    }
-
-    /**
-     * Writes the specified 16-bit character to the OutputStream. Only the lower
-     * 2 bytes are written with the higher of the 2 bytes written first. This
-     * represents the Unicode value of val.
-     * 
-     * @param val
-     *            the character to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeChar(int val) throws IOException {
-        write((val >>> 8) & 0xFF);
-        write((val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes the specified 16-bit characters contained in str to the
-     * OutputStream. Only the lower 2 bytes of each character are written with
-     * the higher of the 2 bytes written first. This represents the Unicode
-     * value of each character in str.
-     * 
-     * @param str
-     *            the String whose characters are to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeChars(String str) throws IOException {
-        byte newBytes[] = new byte[str.length() * 2];
-        for (int index = 0; index < str.length(); index++) {
-            int newIndex = index == 0 ? index : index * 2;
-            newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
-            newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
-        }
-        write(newBytes);
-    }
-
-    /**
-     * Writes a 64-bit double to this output stream. The resulting output is the
-     * 8 bytes resulting from calling Double.doubleToLongBits().
-     * 
-     * @param val
-     *            the double to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeDouble(double val) throws IOException {
-        writeLong(Double.doubleToLongBits(val));
-    }
-
-    /**
-     * Writes a 32-bit float to this output stream. The resulting output is the
-     * 4 bytes resulting from calling Float.floatToIntBits().
-     * 
-     * @param val
-     *            the float to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeFloat(float val) throws IOException {
-        writeInt(Float.floatToIntBits(val));
-    }
-
-    /**
-     * Writes a 32-bit int to this output stream. The resulting output is the 4
-     * bytes, highest order first, of val.
-     * 
-     * @param val
-     *            the int to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeInt(int val) throws IOException {
-        write((val >>> 24) & 0xFF);
-        write((val >>> 16) & 0xFF);
-        write((val >>>  8) & 0xFF);
-        write((val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes a 64-bit long to this output stream. The resulting output is the 8
-     * bytes, highest order first, of val.
-     * 
-     * @param val
-     *            the long to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeLong(long val) throws IOException {
-        write((int)(val >>> 56) & 0xFF);
-        write((int)(val >>> 48) & 0xFF);
-        write((int)(val >>> 40) & 0xFF);
-        write((int)(val >>> 32) & 0xFF);
-        write((int)(val >>> 24) & 0xFF);
-        write((int)(val >>> 16) & 0xFF);
-        write((int)(val >>>  8) & 0xFF);
-        write((int) (val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes the specified 16-bit short to the OutputStream. Only the lower 2
-     * bytes are written with the higher of the 2 bytes written first.
-     * 
-     * @param val
-     *            the short to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeShort(int val) throws IOException {
-        writeChar(val);
-    }
-
-    /**
-     * Writes the specified String out in UTF format.
-     * 
-     * @param str
-     *            the String to be written in UTF format.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeUTF(String str) throws IOException {
-        int utfCount = 0, length = str.length();
-        for (int i = 0; i < length; i++) {
-            int charValue = str.charAt(i);
-            if (charValue > 0 && charValue <= 127) {
-                utfCount++;
-            } else if (charValue <= 2047) {
-                utfCount += 2;
-            } else {
-                utfCount += 3;
-            }
-        }
-        if (utfCount > 65535) {
-            throw new UTFDataFormatException(); //$NON-NLS-1$
-        }
-        byte utfBytes[] = new byte[utfCount + 2];
-        int utfIndex = 2;
-        for (int i = 0; i < length; i++) {
-            int charValue = str.charAt(i);
-            if (charValue > 0 && charValue <= 127) {
-                utfBytes[utfIndex++] = (byte) charValue;
-            } else if (charValue <= 2047) {
-                utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
-            } else {
-                utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
-            }
-        }
-        utfBytes[0] = (byte) (utfCount >> 8);
-        utfBytes[1] = (byte) utfCount;
-        write(utfBytes);
-    }
-
-    private byte[] buf;
-    public synchronized void write(ByteBuffer buffer) throws IOException
-    {
-        int len = buffer.remaining();
-        if (len < 16)
-        {
-            int offset = buffer.position();
-            for (int i = 0 ; i < len ; i++)
-                write(buffer.get(i + offset));
-            return;
-        }
-
-        byte[] buf = this.buf;
-        if (buf == null)
-            this.buf = buf = new byte[256];
-
-        int offset = 0;
-        while (len > 0)
-        {
-            int sublen = Math.min(buf.length, len);
-            ByteBufferUtil.arrayCopy(buffer, buffer.position() + offset, buf, 0, sublen);
-            write(buf, 0, sublen);
-            offset += sublen;
-            len -= sublen;
-        }
-    }
-
-    public void write(Memory memory, long offset, long length) throws IOException
-    {
-        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
-            write(buffer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..f4f46a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+
+/**
+ * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes
+ * before flushing them to an underlying channel.
+ *
+ * This class is completely thread unsafe.
+ */
+public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+    private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
+
+    ByteBuffer buffer;
+
+    public BufferedDataOutputStreamPlus(RandomAccessFile ras)
+    {
+        this(ras.getChannel());
+    }
+
+    public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize)
+    {
+        this(ras.getChannel(), bufferSize);
+    }
+
+    public BufferedDataOutputStreamPlus(FileOutputStream fos)
+    {
+        this(fos.getChannel());
+    }
+
+    public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize)
+    {
+        this(fos.getChannel(), bufferSize);
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel wbc)
+    {
+        this(wbc, DEFAULT_BUFFER_SIZE);
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize)
+    {
+        this(wbc, ByteBuffer.allocateDirect(bufferSize));
+        Preconditions.checkNotNull(wbc);
+        Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+    {
+        super(channel);
+        this.buffer = buffer;
+    }
+
+    public BufferedDataOutputStreamPlus(ByteBuffer buffer)
+    {
+        super();
+        this.buffer = buffer;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException
+    {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        if (b == null)
+            throw new NullPointerException();
+
+        // avoid int overflow
+        if (off < 0 || off > b.length || len < 0
+            || len > b.length - off)
+            throw new IndexOutOfBoundsException();
+
+        if (len == 0)
+            return;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            if (buffer.hasRemaining())
+            {
+                int toCopy = Math.min(len - copied, buffer.remaining());
+                buffer.put(b, off + copied, toCopy);
+                copied += toCopy;
+            }
+            else
+            {
+                doFlush();
+            }
+        }
+    }
+
+    // ByteBuffer to use for defensive copies
+    private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer();
+
+    /*
+     * Makes a defensive copy of the incoming ByteBuffer and don't modify the position or limit
+     * even temporarily so it is thread-safe WRT to the incoming buffer
+     * (non-Javadoc)
+     * @see org.apache.cassandra.io.util.DataOutputPlus#write(java.nio.ByteBuffer)
+     */
+    @Override
+    public void write(ByteBuffer toWrite) throws IOException
+    {
+        if (toWrite.hasArray())
+        {
+            write(toWrite.array(), toWrite.arrayOffset() + toWrite.position(), toWrite.remaining());
+        }
+        else
+        {
+            assert toWrite.isDirect();
+            if (toWrite.remaining() > buffer.remaining())
+            {
+                doFlush();
+                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+                if (toWrite.remaining() > buffer.remaining())
+                {
+                    while (hollowBuffer.hasRemaining())
+                        channel.write(hollowBuffer);
+                }
+                else
+                {
+                    buffer.put(hollowBuffer);
+                }
+            }
+            else
+            {
+                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+                buffer.put(hollowBuffer);
+            }
+        }
+    }
+
+
+    @Override
+    public void write(int b) throws IOException
+    {
+        ensureRemaining(1);
+        buffer.put((byte) (b & 0xFF));
+    }
+
+    @Override
+    public void writeBoolean(boolean v) throws IOException
+    {
+        ensureRemaining(1);
+        buffer.put(v ? (byte)1 : (byte)0);
+    }
+
+    @Override
+    public void writeByte(int v) throws IOException
+    {
+        write(v);
+    }
+
+    @Override
+    public void writeShort(int v) throws IOException
+    {
+        ensureRemaining(2);
+        buffer.putShort((short) v);
+    }
+
+    @Override
+    public void writeChar(int v) throws IOException
+    {
+        ensureRemaining(2);
+        buffer.putChar((char) v);
+    }
+
+    @Override
+    public void writeInt(int v) throws IOException
+    {
+        ensureRemaining(4);
+        buffer.putInt(v);
+    }
+
+    @Override
+    public void writeLong(long v) throws IOException
+    {
+        ensureRemaining(8);
+        buffer.putLong(v);
+    }
+
+    @Override
+    public void writeFloat(float v) throws IOException
+    {
+        ensureRemaining(4);
+        buffer.putFloat(v);
+    }
+
+    @Override
+    public void writeDouble(double v) throws IOException
+    {
+        ensureRemaining(8);
+        buffer.putDouble(v);
+    }
+
+    @Override
+    public void writeBytes(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            writeByte(s.charAt(index));
+    }
+
+    @Override
+    public void writeChars(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            writeChar(s.charAt(index));
+    }
+
+    @Override
+    public void writeUTF(String s) throws IOException
+    {
+        UnbufferedDataOutputStreamPlus.writeUTF(s, this);
+    }
+
+    @Override
+    public void write(Memory memory, long offset, long length) throws IOException
+    {
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+            write(buffer);
+    }
+
+    protected void doFlush() throws IOException
+    {
+        buffer.flip();
+
+        while (buffer.hasRemaining())
+            channel.write(buffer);
+
+        buffer.clear();
+    }
+
+    @Override
+    public void flush() throws IOException
+    {
+        doFlush();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        doFlush();
+        channel.close();
+        FileUtils.clean(buffer);
+        buffer = null;
+    }
+
+    protected void ensureRemaining(int minimum) throws IOException
+    {
+        if (buffer.remaining() < minimum)
+            doFlush();
+    }
+
+    @Override
+    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    {
+        //Don't allow writes to the underlying channel while data is buffered
+        flush();
+        return f.apply(channel);
+    }
+
+    public BufferedDataOutputStreamPlus order(ByteOrder order)
+    {
+        this.buffer.order(order);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
index 2d36d54..bf926e9 100644
--- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index c2eb08a..b556587 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.channels.WritableByteChannel;
 
 
 /**
@@ -28,7 +28,7 @@ import java.util.Arrays;
  *
  * This class is completely thread unsafe.
  */
-public final class DataOutputBuffer extends DataOutputStreamPlus
+public class DataOutputBuffer extends BufferedDataOutputStreamPlus
 {
     public DataOutputBuffer()
     {
@@ -37,67 +37,88 @@ public final class DataOutputBuffer extends DataOutputStreamPlus
 
     public DataOutputBuffer(int size)
     {
-        super(new FastByteArrayOutputStream(size));
+        super(ByteBuffer.allocate(size));
+    }
+
+    protected DataOutputBuffer(ByteBuffer buffer)
+    {
+        super(buffer);
     }
 
     @Override
-    public void write(int b)
+    public void flush() throws IOException
     {
-        try
-        {
-            super.write(b);
-        }
-        catch (IOException e)
-        {
-            throw new AssertionError(e); // FBOS does not throw IOE
-        }
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public void write(byte[] b, int off, int len)
+    protected void doFlush() throws IOException
     {
-        try
+        reallocate(buffer.capacity() * 2);
+    }
+
+    protected void reallocate(long newSize)
+    {
+        assert newSize <= Integer.MAX_VALUE;
+        ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize);
+        buffer.flip();
+        newBuffer.put(buffer);
+        buffer = newBuffer;
+    }
+
+    @Override
+    protected WritableByteChannel newDefaultChannel()
+    {
+        return new GrowingChannel();
+    }
+
+    private final class GrowingChannel implements WritableByteChannel
+    {
+        public int write(ByteBuffer src) throws IOException
         {
-            super.write(b, off, len);
+            int count = src.remaining();
+            reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count));
+            buffer.put(src);
+            return count;
         }
-        catch (IOException e)
+
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        public void close() throws IOException
         {
-            throw new AssertionError(e); // FBOS does not throw IOE
         }
     }
 
-    public void write(ByteBuffer buffer) throws IOException
+    @Override
+    public void close() throws IOException
     {
-        ((FastByteArrayOutputStream) out).write(buffer);
     }
 
-    /**
-     * Returns the current contents of the buffer. Data is only valid to
-     * {@link #getLength()}.
-     * 
-     * @return the buffer contents
-     */
-    public byte[] getData()
+    public ByteBuffer buffer()
     {
-        return ((FastByteArrayOutputStream) out).buf;
+        ByteBuffer result = buffer.duplicate();
+        result.flip();
+        return result;
     }
 
-    public byte[] toByteArray()
+    public byte[] getData()
     {
-        FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
-        return Arrays.copyOfRange(out.buf, 0, out.count);
-
+        return buffer.array();
     }
 
-    public ByteBuffer asByteBuffer()
+    public int getLength()
     {
-        FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
-        return ByteBuffer.wrap(out.buf, 0, out.count);
+        return buffer.position();
     }
 
-    /** @return the length of the valid data currently in the buffer. */
-    public int getLength()
+    public byte[] toByteArray()
     {
-        return ((FastByteArrayOutputStream) out).count;
+        ByteBuffer buffer = buffer();
+        byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
deleted file mode 100644
index b40d30e..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
- * its buffer so copies can be avoided.
- *
- * This class is completely thread unsafe.
- */
-public final class DataOutputByteBuffer extends AbstractDataOutput
-{
-
-    final ByteBuffer buffer;
-    public DataOutputByteBuffer(ByteBuffer buffer)
-    {
-        this.buffer = buffer;
-    }
-
-    @Override
-    public void write(int b)
-    {
-        buffer.put((byte) b);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len)
-    {
-        buffer.put(b, off, len);
-    }
-
-    public void write(ByteBuffer buffer) throws IOException
-    {
-        int len = buffer.remaining();
-        ByteBufferUtil.arrayCopy(buffer, buffer.position(), this.buffer, this.buffer.position(), len);
-        this.buffer.position(this.buffer.position() + len);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index c2901e1..f63c1e5 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -20,12 +20,24 @@ package org.apache.cassandra.io.util;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 
+import com.google.common.base.Function;
+
+/**
+ * Extension to DataOutput that provides for writing ByteBuffer and Memory, potentially with an efficient
+ * implementation that is zero copy or at least has reduced bounds checking overhead.
+ */
 public interface DataOutputPlus extends DataOutput
 {
-
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
     void write(Memory memory, long offset, long length) throws IOException;
+
+    /**
+     * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel
+     * and forget to flush
+     */
+    <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
deleted file mode 100644
index 30cf38b..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-public class DataOutputStreamAndChannel extends DataOutputStreamPlus
-{
-    private final WritableByteChannel channel;
-    public DataOutputStreamAndChannel(OutputStream os, WritableByteChannel channel)
-    {
-        super(os);
-        this.channel = channel;
-    }
-    public DataOutputStreamAndChannel(WritableByteChannel channel)
-    {
-        this(Channels.newOutputStream(channel), channel);
-    }
-    public DataOutputStreamAndChannel(FileOutputStream fos)
-    {
-        this(fos, fos.getChannel());
-    }
-
-    public void write(ByteBuffer buffer) throws IOException
-    {
-        buffer = buffer.duplicate();
-        while (buffer.remaining() > 0)
-            channel.write(buffer);
-    }
-
-    public WritableByteChannel getChannel()
-    {
-        return channel;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index 7c1c9d8..6de2879 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -19,36 +19,117 @@ package org.apache.cassandra.io.util;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
- * When possible use {@link DataOutputStreamAndChannel} instead of this class, as it will
- * be more efficient. This class is only for situations where it cannot be used
+ * Abstract base class for DataOutputStreams that accept writes from ByteBuffer or Memory and also provide
+ * access to the underlying WritableByteChannel associated with their output stream.
+ *
+ * If no channel is provided by derived classes then a wrapper channel is provided.
  */
-public class DataOutputStreamPlus extends AbstractDataOutput implements DataOutputPlus
+public abstract class DataOutputStreamPlus extends OutputStream implements DataOutputPlus
 {
-    protected final OutputStream out;
-    public DataOutputStreamPlus(OutputStream out)
+    //Dummy wrapper channel for derived implementations that don't have a channel
+    protected final WritableByteChannel channel;
+
+    protected DataOutputStreamPlus()
     {
-        this.out = out;
+        this.channel = newDefaultChannel();
     }
 
-    public void write(byte[] buffer, int offset, int count) throws IOException
+    protected DataOutputStreamPlus(WritableByteChannel channel)
     {
-        out.write(buffer, offset, count);
+        this.channel = channel;
     }
 
-    public void write(int oneByte) throws IOException
+    private static int MAX_BUFFER_SIZE =
+            Integer.getInteger(Config.PROPERTY_PREFIX + "data_output_stream_plus_temp_buffer_size", 8192);
+
+    /*
+     * Factored out into separate method to create more flexibility around inlining
+     */
+    protected static byte[] retrieveTemporaryBuffer(int minSize)
     {
-        out.write(oneByte);
+        byte[] bytes = tempBuffer.get();
+        if (bytes.length < minSize)
+        {
+            // increase in powers of 2, to avoid wasted repeat allocations
+            bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))];
+            tempBuffer.set(bytes);
+        }
+        return bytes;
     }
 
-    public void close() throws IOException
+    private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>()
     {
-        out.close();
-    }
+        @Override
+        public byte[] initialValue()
+        {
+            return new byte[16];
+        }
+    };
 
-    public void flush() throws IOException
+    // Derived classes can override and *construct* a real channel, if it is not possible to provide one to the constructor
+    protected WritableByteChannel newDefaultChannel()
     {
-        out.flush();
+        return new WritableByteChannel()
+        {
+
+            @Override
+            public boolean isOpen()
+            {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+            }
+
+            @Override
+            public int write(ByteBuffer src) throws IOException
+            {
+                int toWrite = src.remaining();
+
+                if (src.hasArray())
+                {
+                    DataOutputStreamPlus.this.write(src.array(), src.arrayOffset() + src.position(), src.remaining());
+                    src.position(src.limit());
+                    return toWrite;
+                }
+
+                if (toWrite < 16)
+                {
+                    int offset = src.position();
+                    for (int i = 0 ; i < toWrite ; i++)
+                        DataOutputStreamPlus.this.write(src.get(i + offset));
+                    src.position(src.limit());
+                    return toWrite;
+                }
+
+                byte[] buf = retrieveTemporaryBuffer(toWrite);
+
+                int totalWritten = 0;
+                while (totalWritten < toWrite)
+                {
+                    int toWriteThisTime = Math.min(buf.length, toWrite - totalWritten);
+
+                    ByteBufferUtil.arrayCopy(src, src.position() + totalWritten, buf, 0, toWriteThisTime);
+
+                    DataOutputStreamPlus.this.write(buf, 0, toWriteThisTime);
+
+                    totalWritten += toWriteThisTime;
+                }
+
+                src.position(src.limit());
+                return totalWritten;
+            }
+
+        };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java b/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
deleted file mode 100644
index 0e509b3..0000000
--- a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/*
- * This file has been modified from Apache Harmony's ByteArrayOutputStream
- * implementation. The synchronized methods of the original have been
- * replaced by non-synchronized methods. This makes certain operations
- * much FASTer, but also *not thread-safe*.
- *
- * This file remains formatted the same as the Apache Harmony original to
- * make patching easier if any bug fixes are made to the Harmony version.
- */
-
-/**
- * A specialized {@link OutputStream} for class for writing content to an
- * (internal) byte array. As bytes are written to this stream, the byte array
- * may be expanded to hold more bytes. When the writing is considered to be
- * finished, a copy of the byte array can be requested from the class.
- *
- * @see ByteArrayOutputStream
- */
-public class FastByteArrayOutputStream extends OutputStream {
-    /**
-     * The byte array containing the bytes written.
-     */
-    protected byte[] buf;
-
-    /**
-     * The number of bytes written.
-     */
-    protected int count;
-
-    /**
-     * Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
-     * If more than 32 bytes are written to this instance, the underlying byte
-     * array will expand.
-     */
-    public FastByteArrayOutputStream() {
-        buf = new byte[32];
-    }
-
-    /**
-     * Constructs a new {@code ByteArrayOutputStream} with a default size of
-     * {@code size} bytes. If more than {@code size} bytes are written to this
-     * instance, the underlying byte array will expand.
-     *
-     * @param size
-     *            initial size for the underlying byte array, must be
-     *            non-negative.
-     * @throws IllegalArgumentException
-     *             if {@code size < 0}.
-     */
-    public FastByteArrayOutputStream(int size) {
-        if (size >= 0) {
-            buf = new byte[size];
-        } else {
-            throw new IllegalArgumentException();
-        }
-    }
-
-    /**
-     * Closes this stream. This releases system resources used for this stream.
-     *
-     * @throws IOException
-     *             if an error occurs while attempting to close this stream.
-     */
-    @Override
-    public void close() throws IOException {
-        /**
-         * Although the spec claims "A closed stream cannot perform output
-         * operations and cannot be reopened.", this implementation must do
-         * nothing.
-         */
-        super.close();
-    }
-
-    private void expand(int i) {
-        /* Can the buffer handle @i more bytes, if not expand it */
-        if (count + i <= buf.length) {
-            return;
-        }
-
-        long expectedExtent = (count + i) * 2L; //long to deal with possible int overflow
-        int newSize = (int) Math.min(Integer.MAX_VALUE - 8, expectedExtent); // MAX_ARRAY_SIZE
-        byte[] newbuf = new byte[newSize];
-        System.arraycopy(buf, 0, newbuf, 0, count);
-        buf = newbuf;
-    }
-
-    /**
-     * Resets this stream to the beginning of the underlying byte array. All
-     * subsequent writes will overwrite any bytes previously stored in this
-     * stream.
-     */
-    public void reset() {
-        count = 0;
-    }
-
-    /**
-     * Returns the total number of bytes written to this stream so far.
-     *
-     * @return the number of bytes written to this stream.
-     */
-    public int size() {
-        return count;
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a byte array. Any
-     * changes made to the receiver after returning will not be reflected in the
-     * byte array returned to the caller.
-     *
-     * @return this stream's current contents as a byte array.
-     */
-    public byte[] toByteArray() {
-        byte[] newArray = new byte[count];
-        System.arraycopy(buf, 0, newArray, 0, count);
-        return newArray;
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string. Any
-     * changes made to the receiver after returning will not be reflected in the
-     * string returned to the caller.
-     *
-     * @return this stream's current contents as a string.
-     */
-
-    @Override
-    public String toString() {
-        return new String(buf, 0, count);
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string. Each byte
-     * {@code b} in this stream is converted to a character {@code c} using the
-     * following function:
-     * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
-     * deprecated and either {@link #toString()} or {@link #toString(String)}
-     * should be used.
-     *
-     * @param hibyte
-     *            the high byte of each resulting Unicode character.
-     * @return this stream's current contents as a string with the high byte set
-     *         to {@code hibyte}.
-     * @deprecated Use {@link #toString()}.
-     */
-    @Deprecated
-    public String toString(int hibyte) {
-        char[] newBuf = new char[size()];
-        for (int i = 0; i < newBuf.length; i++) {
-            newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
-        }
-        return new String(newBuf);
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string converted
-     * according to the encoding declared in {@code enc}.
-     *
-     * @param enc
-     *            a string representing the encoding to use when translating
-     *            this stream to a string.
-     * @return this stream's current contents as an encoded string.
-     * @throws UnsupportedEncodingException
-     *             if the provided encoding is not supported.
-     */
-    public String toString(String enc) throws UnsupportedEncodingException {
-        return new String(buf, 0, count, enc);
-    }
-
-    /**
-     * Writes {@code count} bytes from the byte array {@code buffer} starting at
-     * offset {@code index} to this stream.
-     *
-     * @param buffer
-     *            the buffer to be written.
-     * @param offset
-     *            the initial position in {@code buffer} to retrieve bytes.
-     * @param len
-     *            the number of bytes of {@code buffer} to write.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     * @throws IndexOutOfBoundsException
-     *             if {@code offset < 0} or {@code len < 0}, or if
-     *             {@code offset + len} is greater than the length of
-     *             {@code buffer}.
-     */
-    @Override
-    public void write(byte[] buffer, int offset, int len) {
-        // avoid int overflow
-        if (offset < 0 || offset > buffer.length || len < 0
-                || len > buffer.length - offset
-                || this.count + len < 0) {
-            throw new IndexOutOfBoundsException();
-        }
-        if (len == 0) {
-            return;
-        }
-
-        /* Expand if necessary */
-        expand(len);
-        System.arraycopy(buffer, offset, buf, this.count, len);
-        this.count += len;
-    }
-
-    public void write(ByteBuffer buffer)
-    {
-        int len = buffer.remaining();
-        expand(len);
-        ByteBufferUtil.arrayCopy(buffer, buffer.position(), buf, this.count, len);
-        this.count += len;
-    }
-
-    /**
-     * Writes the specified byte {@code oneByte} to the OutputStream. Only the
-     * low order byte of {@code oneByte} is written.
-     *
-     * @param oneByte
-     *            the byte to be written.
-     */
-    @Override
-    public void write(int oneByte) {
-        if (count == buf.length) {
-            expand(1);
-        }
-        buf[count++] = (byte) oneByte;
-    }
-
-    /**
-     * Takes the contents of this stream and writes it to the output stream
-     * {@code out}.
-     *
-     * @param out
-     *            an OutputStream on which to write the contents of this stream.
-     * @throws IOException
-     *             if an error occurs while writing to {@code out}.
-     */
-    public void writeTo(OutputStream out) throws IOException {
-        out.write(buf, 0, count);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 78a3ea5..07d3ca3 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -386,6 +386,7 @@ public class Memory implements AutoCloseable
 
     public ByteBuffer[] asByteBuffers(long offset, long length)
     {
+        checkBounds(offset, offset + length);
         if (size() == 0)
             return NO_BYTE_BUFFERS;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
new file mode 100644
index 0000000..94ba9ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Rough equivalent of BufferedInputStream and DataInputStream wrapping the input stream of a File or Socket
+ * Created to work around the fact that when BIS + DIS delegate to NIO for socket IO they will allocate large
+ * thread local direct byte buffers when a large array is used to read.
+ *
+ * There may also be some performance improvement due to using a DBB as the underlying buffer for IO and the removal
+ * of some indirection and delegation when it comes to reading out individual values, but that is not the goal.
+ *
+ * Closing NIODataInputStream will invoke close on the ReadableByteChannel provided at construction.
+ *
+ * NIODataInputStream is not thread safe.
+ */
+public class NIODataInputStream extends InputStream implements DataInput, Closeable
+{
+    private final ReadableByteChannel rbc;
+    private final ByteBuffer buf;
+
+
+    public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
+    {
+        Preconditions.checkNotNull(rbc);
+        Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double");
+        this.rbc = rbc;
+        buf = ByteBuffer.allocateDirect(bufferSize);
+        buf.position(0);
+        buf.limit(0);
+    }
+
+    @Override
+    public void readFully(byte[] b) throws IOException
+    {
+        readFully(b, 0, b.length);
+    }
+
+
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException
+    {
+        int copied = 0;
+        while (copied < len)
+        {
+            int read = read(b, off + copied, len - copied);
+            if (read < 0)
+                throw new EOFException();
+            copied += read;
+        }
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if (b == null)
+            throw new NullPointerException();
+
+        // avoid int overflow
+        if (off < 0 || off > b.length || len < 0
+                || len > b.length - off)
+            throw new IndexOutOfBoundsException();
+
+        if (len == 0)
+            return 0;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            if (buf.hasRemaining())
+            {
+                int toCopy = Math.min(len - copied, buf.remaining());
+                buf.get(b, off + copied, toCopy);
+                copied += toCopy;
+            }
+            else
+            {
+                int read = readNext();
+                if (read < 0 && copied == 0) return -1;
+                if (read <= 0) return copied;
+            }
+        }
+
+        return copied;
+    }
+
+    /*
+     * Refill the buffer, preserving any unread bytes remaining in the buffer
+     */
+    private int readNext() throws IOException
+    {
+        Preconditions.checkState(buf.remaining() != buf.capacity());
+        assert(buf.remaining() < 8);
+
+        /*
+         * If there is data already at the start of the buffer, move the position to the end
+         * If there is data but not at the start, move it to the start
+         * Otherwise move the position to 0 so writes start at the beginning of the buffer
+         *
+         * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained
+         * while retrieving a multi-byte value while the position is in the middle.
+         */
+        if (buf.position() == 0 && buf.hasRemaining())
+        {
+            buf.position(buf.limit());
+        }
+        else if (buf.hasRemaining())
+        {
+            ByteBuffer dup = buf.duplicate();
+            buf.clear();
+            buf.put(dup);
+        }
+        else
+        {
+            buf.position(0);
+        }
+
+        buf.limit(buf.capacity());
+
+        int read = 0;
+        while ((read = rbc.read(buf)) == 0) {}
+
+        buf.flip();
+
+        return read;
+    }
+
+    /*
+     * Read at least minimum bytes and throw EOF if that fails
+     */
+    private void readMinimum(int minimum) throws IOException
+    {
+        assert(buf.remaining() < 8);
+        while (buf.remaining() < minimum)
+        {
+            int read = readNext();
+            if (read == -1)
+            {
+                //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here
+                buf.position(0);
+                buf.limit(0);
+                throw new EOFException();
+            }
+        }
+    }
+
+    /*
+     * Ensure the buffer contains the minimum number of readable bytes
+     */
+    private void prepareReadPrimitive(int minimum) throws IOException
+    {
+        if (buf.remaining() < minimum) readMinimum(minimum);
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException
+    {
+        int skipped = 0;
+
+        while (skipped < n)
+        {
+            int skippedThisTime = (int)skip(n - skipped);
+            if (skippedThisTime <= 0) break;
+            skipped += skippedThisTime;
+        }
+
+        return skipped;
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get() != 0;
+    }
+
+    @Override
+    public byte readByte() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get();
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get() & 0xff;
+    }
+
+    @Override
+    public short readShort() throws IOException
+    {
+        prepareReadPrimitive(2);
+        return buf.getShort();
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException
+    {
+        return readShort() & 0xFFFF;
+    }
+
+    @Override
+    public char readChar() throws IOException
+    {
+        prepareReadPrimitive(2);
+        return buf.getChar();
+    }
+
+    @Override
+    public int readInt() throws IOException
+    {
+        prepareReadPrimitive(4);
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() throws IOException
+    {
+        prepareReadPrimitive(8);
+        return buf.getLong();
+    }
+
+    @Override
+    public float readFloat() throws IOException
+    {
+        prepareReadPrimitive(4);
+        return buf.getFloat();
+    }
+
+    @Override
+    public double readDouble() throws IOException
+    {
+        prepareReadPrimitive(8);
+        return buf.getDouble();
+    }
+
+    @Override
+    public String readLine() throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        return DataInputStream.readUTF(this);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+            rbc.close();
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        return readUnsignedByte();
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+        if (rbc instanceof SeekableByteChannel)
+        {
+            SeekableByteChannel sbc = (SeekableByteChannel)rbc;
+            long remainder = Math.max(0, sbc.size() - sbc.position());
+            return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining());
+        }
+        return buf.remaining();
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        throw new IOException("mark/reset not supported");
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 6c87cf9..1fc374f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -22,122 +22,79 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+public class SafeMemoryWriter extends DataOutputBuffer
 {
-    private ByteOrder order = ByteOrder.BIG_ENDIAN;
-    private SafeMemory buffer;
-    private long length;
+    private SafeMemory memory;
 
     public SafeMemoryWriter(long initialCapacity)
     {
-        buffer = new SafeMemory(initialCapacity);
+        this(new SafeMemory(initialCapacity));
     }
 
-    @Override
-    public void write(byte[] buffer, int offset, int count)
-    {
-        long newLength = ensureCapacity(count);
-        this.buffer.setBytes(this.length, buffer, offset, count);
-        this.length = newLength;
-    }
-
-    @Override
-    public void write(int oneByte)
+    private SafeMemoryWriter(SafeMemory memory)
     {
-        long newLength = ensureCapacity(1);
-        buffer.setByte(length++, (byte) oneByte);
-        length = newLength;
+        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
+        this.memory = memory;
     }
 
-    @Override
-    public void writeShort(int val) throws IOException
+    public SafeMemory currentBuffer()
     {
-        if (order != ByteOrder.nativeOrder())
-            val = Short.reverseBytes((short) val);
-        long newLength = ensureCapacity(2);
-        buffer.setShort(length, (short) val);
-        length = newLength;
+        return memory;
     }
 
-    @Override
-    public void writeInt(int val)
+    protected void reallocate(long newCapacity)
     {
-        if (order != ByteOrder.nativeOrder())
-            val = Integer.reverseBytes(val);
-        long newLength = ensureCapacity(4);
-        buffer.setInt(length, val);
-        length = newLength;
-    }
+        if (newCapacity != capacity())
+        {
+            long position = length();
+            ByteOrder order = buffer.order();
 
-    @Override
-    public void writeLong(long val)
-    {
-        if (order != ByteOrder.nativeOrder())
-            val = Long.reverseBytes(val);
-        long newLength = ensureCapacity(8);
-        buffer.setLong(length, val);
-        length = newLength;
-    }
+            SafeMemory oldBuffer = memory;
+            memory = this.memory.copy(newCapacity);
+            buffer = tailBuffer(memory);
 
-    @Override
-    public void write(ByteBuffer buffer)
-    {
-        long newLength = ensureCapacity(buffer.remaining());
-        this.buffer.setBytes(length, buffer);
-        length = newLength;
-    }
+            int newPosition = (int) (position - tailOffset(memory));
+            buffer.position(newPosition);
+            buffer.order(order);
 
-    @Override
-    public void write(Memory memory, long offset, long size)
-    {
-        long newLength = ensureCapacity(size);
-        buffer.put(length, memory, offset, size);
-        length = newLength;
-    }
-
-    private long ensureCapacity(long size)
-    {
-        long newLength = this.length + size;
-        if (newLength > buffer.size())
-            setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
-        return newLength;
-    }
-
-    public SafeMemory currentBuffer()
-    {
-        return buffer;
+            oldBuffer.free();
+        }
     }
 
     public void setCapacity(long newCapacity)
     {
-        if (newCapacity != capacity())
-        {
-            SafeMemory oldBuffer = buffer;
-            buffer = this.buffer.copy(newCapacity);
-            oldBuffer.free();
-        }
+        reallocate(newCapacity);
     }
 
     public void close()
     {
-        buffer.close();
+        memory.close();
     }
 
     public long length()
     {
-        return length;
+        return tailOffset(memory) +  buffer.position();
     }
 
     public long capacity()
     {
-        return buffer.size();
+        return memory.size();
     }
 
-    // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
-    // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+    @Override
     public SafeMemoryWriter order(ByteOrder order)
     {
-        this.order = order;
+        super.order(order);
         return this;
     }
+
+    private static long tailOffset(Memory memory)
+    {
+        return Math.max(0, memory.size - Integer.MAX_VALUE);
+    }
+
+    private static ByteBuffer tailBuffer(Memory memory)
+    {
+        return memory.asByteBuffer(tailOffset(memory), (int) Math.min(memory.size, Integer.MAX_VALUE));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index f8ea92f..c4fef07 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -26,7 +26,6 @@ import java.nio.file.StandardOpenOption;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -97,7 +96,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         fd = CLibrary.getfd(channel);
 
         directoryFD = CLibrary.tryOpenDirectory(file.getParent());
-        stream = new DataOutputStreamAndChannel(this, this);
+        stream = new WrappedDataOutputStreamPlus(this, this);
     }
 
     /**


Mime
View raw message