cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [2/5] ByteBuffer write() methods for serializing sstables
Date Wed, 19 Mar 2014 17:02:30 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 025daab..947e5d5 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -27,6 +26,7 @@ import java.util.UUID;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -96,7 +96,7 @@ public class StreamInitMessage
 
     private static class StreamInitMessageSerializer implements IVersionedSerializer<StreamInitMessage>
     {
-        public void serialize(StreamInitMessage message, DataOutput out, int version) throws IOException
+        public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException
         {
             CompactEndpointSerializationHelper.serialize(message.from, out);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 7010c95..50c3911 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.streaming.StreamSession;
 
 /**
@@ -34,14 +35,13 @@ public abstract class StreamMessage
     /** Streaming protocol version */
     public static final int CURRENT_VERSION = 1;
 
-    public static void serialize(StreamMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+    public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
     {
         ByteBuffer buff = ByteBuffer.allocate(1);
         // message type
         buff.put(message.type.type);
         buff.flip();
-        while (buff.hasRemaining())
-            out.write(buff);
+        out.write(buff);
         message.type.outSerializer.serialize(message, out, version, session);
     }
 
@@ -66,7 +66,7 @@ public abstract class StreamMessage
     public static interface Serializer<V extends StreamMessage>
     {
         V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
-        void serialize(V message, WritableByteChannel out, int version, StreamSession session) throws IOException;
+        void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException;
     }
 
     /** StreamMessage types */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 650c74e..62ef167 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1870,7 +1870,7 @@ public class CassandraServer implements Cassandra.Iface
 
                     decompressor.end();
 
-                    queryString = new String(decompressed.getData(), 0, decompressed.size(), "UTF-8");
+                    queryString = new String(decompressed.getData(), 0, decompressed.getLength(), "UTF-8");
                     break;
                 case NONE:
                     try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 6cce81d..cff6e71 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -169,7 +169,6 @@ public class SSTableExport
      * Serialize a given cell to the JSON format
      *
      * @param cell     cell presentation
-     * @param comparator columns comparator
      * @param cfMetaData Column Family metadata (to get validator)
      * @return cell as serialized list
      */
@@ -178,7 +177,7 @@ public class SSTableExport
         CellNameType comparator = cfMetaData.comparator;
         ArrayList<Object> serializedColumn = new ArrayList<Object>();
 
-        ByteBuffer value = ByteBufferUtil.clone(cell.value());
+        ByteBuffer value = cell.value();
 
         serializedColumn.add(comparator.getString(cell.name()));
         if (cell instanceof DeletedCell)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
index 8c1aab8..c797331 100644
--- a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
+++ b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
@@ -46,7 +46,7 @@ public final class AtomicLongArrayUpdater {
                         } catch (NoSuchFieldException e)
                         {
                             // It doesn't matter what we throw;
-                            // it's swallowed in getBestComparer().
+                            // it's swallowed in getBest().
                             throw new Error();
                         } catch (IllegalAccessException e)
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 1ec1459..b95544c 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -18,18 +18,18 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.obs.IBitSet;
 import org.apache.cassandra.utils.obs.OffHeapBitSet;
 import org.apache.cassandra.utils.obs.OpenBitSet;
 
 abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
 {
-    public void serialize(BloomFilter bf, DataOutput out) throws IOException
+    public void serialize(BloomFilter bf, DataOutputPlus out) throws IOException
     {
         out.writeInt(bf.hashCount);
         bf.bitset.serialize(out);
@@ -51,7 +51,7 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
 
     /**
      * Calculates a serialized size of the given Bloom Filter
-     * @see BloomFilterSerializer#serialize(BloomFilter, DataOutput)
+     * @see org.apache.cassandra.io.ISerializer#serialize(Object, org.apache.cassandra.io.util.DataOutputPlus)
      *
      * @param bf Bloom filter to calculate serialized size
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BooleanSerializer.java b/src/java/org/apache/cassandra/utils/BooleanSerializer.java
index 0c37f67..f1707c3 100644
--- a/src/java/org/apache/cassandra/utils/BooleanSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BooleanSerializer.java
@@ -22,12 +22,13 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class BooleanSerializer implements IVersionedSerializer<Boolean>
 {
     public static BooleanSerializer serializer = new BooleanSerializer();
 
-    public void serialize(Boolean b, DataOutput out, int version) throws IOException
+    public void serialize(Boolean b, DataOutputPlus out, int version) throws IOException
     {
         out.writeBoolean(b);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 0d1b141..1bd5d4e 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.UUID;
 
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -81,35 +82,17 @@ public class ByteBufferUtil
     {
         assert o1 != null;
         assert o2 != null;
-        if (o1 == o2)
-            return 0;
-
-        if (o1.hasArray() && o2.hasArray())
-        {
-            return FBUtilities.compareUnsigned(o1.array(), o2.array(), o1.position() + o1.arrayOffset(),
-                    o2.position() + o2.arrayOffset(), o1.remaining(), o2.remaining());
-        }
-
-        int end1 = o1.position() + o1.remaining();
-        int end2 = o2.position() + o2.remaining();
-        for (int i = o1.position(), j = o2.position(); i < end1 && j < end2; i++, j++)
-        {
-            int a = (o1.get(i) & 0xff);
-            int b = (o2.get(j) & 0xff);
-            if (a != b)
-                return a - b;
-        }
-        return o1.remaining() - o2.remaining();
+        return FastByteOperations.compareUnsigned(o1, o2);
     }
 
     public static int compare(byte[] o1, ByteBuffer o2)
     {
-        return compareUnsigned(ByteBuffer.wrap(o1), o2);
+        return FastByteOperations.compareUnsigned(o1, 0, o1.length, o2);
     }
 
     public static int compare(ByteBuffer o1, byte[] o2)
     {
-        return compareUnsigned(o1, ByteBuffer.wrap(o2));
+        return FastByteOperations.compareUnsigned(o1, o2, 0, o2.length);
     }
 
     /**
@@ -270,12 +253,9 @@ public class ByteBufferUtil
         return clone;
     }
 
-    public static void arrayCopy(ByteBuffer buffer, int position, byte[] bytes, int offset, int length)
+    public static void arrayCopy(ByteBuffer src, int srcPos, byte[] dst, int dstPos, int length)
     {
-        if (buffer.hasArray())
-            System.arraycopy(buffer.array(), buffer.arrayOffset() + position, bytes, offset, length);
-        else
-            ((ByteBuffer) buffer.duplicate().position(position)).get(bytes, offset, length);
+        FastByteOperations.copy(src, srcPos, dst, dstPos, length);
     }
 
     /**
@@ -290,29 +270,13 @@ public class ByteBufferUtil
      */
     public static void arrayCopy(ByteBuffer src, int srcPos, ByteBuffer dst, int dstPos, int length)
     {
-        if (src.hasArray() && dst.hasArray())
-        {
-            System.arraycopy(src.array(),
-                             src.arrayOffset() + srcPos,
-                             dst.array(),
-                             dst.arrayOffset() + dstPos,
-                             length);
-        }
-        else
-        {
-            if (src.limit() - srcPos < length || dst.limit() - dstPos < length)
-                throw new IndexOutOfBoundsException();
-
-            for (int i = 0; i < length; i++)
-                // TODO: ByteBuffer.put is polymorphic, and might be slow here
-                dst.put(dstPos++, src.get(srcPos++));
-        }
+        FastByteOperations.copy(src, srcPos, dst, dstPos, length);
     }
 
-    public static void writeWithLength(ByteBuffer bytes, DataOutput out) throws IOException
+    public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) throws IOException
     {
         out.writeInt(bytes.remaining());
-        write(bytes, out); // writing data bytes to output source
+        out.write(bytes);
     }
 
     public static void writeWithLength(byte[] bytes, DataOutput out) throws IOException
@@ -321,27 +285,20 @@ public class ByteBufferUtil
         out.write(bytes);
     }
 
-    public static void write(ByteBuffer buffer, DataOutput out) throws IOException
+    public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException
     {
-        if (buffer.hasArray())
-        {
-            out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
-        }
-        else
-        {
-            for (int i = buffer.position(); i < buffer.limit(); i++)
-            {
-                out.writeByte(buffer.get(i));
-            }
-        }
+        int length = buffer.remaining();
+        assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
+        out.writeShort(length);
+        out.write(buffer);
     }
 
-    public static void writeWithShortLength(ByteBuffer buffer, DataOutput out) throws IOException
+    public static void writeWithShortLength(byte[] buffer, DataOutput out) throws IOException
     {
-        int length = buffer.remaining();
+        int length = buffer.length;
         assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
         out.writeShort(length);
-        write(buffer, out); // writing data bytes to output source
+        out.write(buffer);
     }
 
     public static ByteBuffer readWithLength(DataInput in) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 9ed7738..196a3b9 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLongArray;
@@ -27,6 +26,8 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
 import org.slf4j.Logger;
 
 public class EstimatedHistogram
@@ -317,7 +318,7 @@ public class EstimatedHistogram
 
     public static class EstimatedHistogramSerializer implements ISerializer<EstimatedHistogram>
     {
-        public void serialize(EstimatedHistogram eh, DataOutput out) throws IOException
+        public void serialize(EstimatedHistogram eh, DataOutputPlus out) throws IOException
         {
             long[] offsets = eh.getBucketOffsets();
             long[] buckets = eh.getBuckets(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 7b574e2..12c393a 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -213,7 +213,7 @@ public class FBUtilities
 
     public static int compareUnsigned(byte[] bytes1, byte[] bytes2, int offset1, int offset2, int len1, int len2)
     {
-        return FastByteComparisons.compareTo(bytes1, offset1, len1, bytes2, offset2, len2);
+        return FastByteOperations.compareUnsigned(bytes1, offset1, len1, bytes2, offset2, len2);
     }
 
     public static int compareUnsigned(byte[] bytes1, byte[] bytes2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/FastByteComparisons.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FastByteComparisons.java b/src/java/org/apache/cassandra/utils/FastByteComparisons.java
deleted file mode 100644
index 4be6cd4..0000000
--- a/src/java/org/apache/cassandra/utils/FastByteComparisons.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.lang.reflect.Field;
-import java.nio.ByteOrder;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-
-import sun.misc.Unsafe;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedBytes;
-
-/**
- * Utility code to do optimized byte-array comparison.
- * This is borrowed and slightly modified from Guava's {@link UnsignedBytes}
- * class to be able to compare arrays that start at non-zero offsets.
- */
-abstract class FastByteComparisons {
-
-  /**
-   * Lexicographically compare two byte arrays.
-   */
-  public static int compareTo(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-    return LexicographicalComparerHolder.BEST_COMPARER.compareTo(
-        b1, s1, l1, b2, s2, l2);
-  }
-
-  private interface Comparer<T> {
-    abstract public int compareTo(T buffer1, int offset1, int length1,
-        T buffer2, int offset2, int length2);
-  }
-
-  private static Comparer<byte[]> lexicographicalComparerJavaImpl() {
-    return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
-  }
-
-
-  /**
-   * Provides a lexicographical comparer implementation; either a Java
-   * implementation or a faster implementation based on {@link Unsafe}.
-   *
-   * <p>Uses reflection to gracefully fall back to the Java implementation if
-   * {@code Unsafe} isn't available.
-   */
-  private static class LexicographicalComparerHolder {
-    static final String UNSAFE_COMPARER_NAME =
-        LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
-
-    static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
-    /**
-     * Returns the Unsafe-using Comparer, or falls back to the pure-Java
-     * implementation if unable to do so.
-     */
-    static Comparer<byte[]> getBestComparer() {
-      String arch = System.getProperty("os.arch");
-      boolean unaligned = arch.equals("i386") || arch.equals("x86")
-                    || arch.equals("amd64") || arch.equals("x86_64");
-      if (!unaligned)
-        return lexicographicalComparerJavaImpl();
-      try {
-        Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
-
-        // yes, UnsafeComparer does implement Comparer<byte[]>
-        @SuppressWarnings("unchecked")
-        Comparer<byte[]> comparer =
-          (Comparer<byte[]>) theClass.getEnumConstants()[0];
-        return comparer;
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return lexicographicalComparerJavaImpl();
-      }
-    }
-
-    private enum PureJavaComparer implements Comparer<byte[]> {
-      INSTANCE;
-
-      @Override
-      public int compareTo(byte[] buffer1, int offset1, int length1,
-          byte[] buffer2, int offset2, int length2) {
-        // Short circuit equal case
-        if (buffer1 == buffer2 &&
-            offset1 == offset2 &&
-            length1 == length2) {
-          return 0;
-        }
-        int end1 = offset1 + length1;
-        int end2 = offset2 + length2;
-        for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
-          int a = (buffer1[i] & 0xff);
-          int b = (buffer2[j] & 0xff);
-          if (a != b) {
-            return a - b;
-          }
-        }
-        return length1 - length2;
-      }
-    }
-
-    @SuppressWarnings("unused") // used via reflection
-    private enum UnsafeComparer implements Comparer<byte[]> {
-      INSTANCE;
-
-      static final Unsafe theUnsafe;
-
-      /** The offset to the first element in a byte array. */
-      static final int BYTE_ARRAY_BASE_OFFSET;
-
-      static {
-        theUnsafe = (Unsafe) AccessController.doPrivileged(
-            new PrivilegedAction<Object>() {
-              @Override
-              public Object run() {
-                try {
-                  Field f = Unsafe.class.getDeclaredField("theUnsafe");
-                  f.setAccessible(true);
-                  return f.get(null);
-                } catch (NoSuchFieldException e) {
-                  // It doesn't matter what we throw;
-                  // it's swallowed in getBestComparer().
-                  throw new Error();
-                } catch (IllegalAccessException e) {
-                  throw new Error();
-                }
-              }
-            });
-
-        BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
-
-        // sanity check - this should never fail
-        if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
-          throw new AssertionError();
-        }
-      }
-
-      static final boolean littleEndian =
-        ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-      /**
-       * Returns true if x1 is less than x2, when both values are treated as
-       * unsigned.
-       */
-      static boolean lessThanUnsigned(long x1, long x2) {
-        return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
-      }
-
-      /**
-       * Lexicographically compare two arrays.
-       *
-       * @param buffer1 left operand
-       * @param buffer2 right operand
-       * @param offset1 Where to start comparing in the left buffer
-       * @param offset2 Where to start comparing in the right buffer
-       * @param length1 How much to compare from the left buffer
-       * @param length2 How much to compare from the right buffer
-       * @return 0 if equal, < 0 if left is less than right, etc.
-       */
-      @Override
-      public int compareTo(byte[] buffer1, int offset1, int length1,
-          byte[] buffer2, int offset2, int length2) {
-        // Short circuit equal case
-        if (buffer1 == buffer2 &&
-            offset1 == offset2 &&
-            length1 == length2) {
-          return 0;
-        }
-        int minLength = Math.min(length1, length2);
-        int minWords = minLength / Longs.BYTES;
-        int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
-        int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
-
-        /*
-         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
-         * time is no slower than comparing 4 bytes at a time even on 32-bit.
-         * On the other hand, it is substantially faster on 64-bit.
-         */
-        for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
-          long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
-          long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
-          long diff = lw ^ rw;
-
-          if (diff != 0) {
-            if (!littleEndian) {
-              return lessThanUnsigned(lw, rw) ? -1 : 1;
-            }
-
-            // Use binary search
-            int n = 0;
-            int y;
-            int x = (int) diff;
-            if (x == 0) {
-              x = (int) (diff >>> 32);
-              n = 32;
-            }
-
-            y = x << 16;
-            if (y == 0) {
-              n += 16;
-            } else {
-              x = y;
-            }
-
-            y = x << 8;
-            if (y == 0) {
-              n += 8;
-            }
-            return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
-          }
-        }
-
-        // The epilogue to cover the last (minLength % 8) elements.
-        for (int i = minWords * Longs.BYTES; i < minLength; i++) {
-          int result = UnsignedBytes.compare(
-              buffer1[offset1 + i],
-              buffer2[offset2 + i]);
-          if (result != 0) {
-            return result;
-          }
-        }
-        return length1 - length2;
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/FastByteOperations.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FastByteOperations.java b/src/java/org/apache/cassandra/utils/FastByteOperations.java
new file mode 100644
index 0000000..e60b096
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/FastByteOperations.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Field;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+
+import sun.misc.Unsafe;
+
+/**
+ * Utility code to do optimized byte-array comparison.
+ * This is borrowed and slightly modified from Guava's {@link UnsignedBytes}
+ * class to be able to compare arrays that start at non-zero offsets.
+ */
+class FastByteOperations
+{
+
+    /**
+     * Lexicographically compare two byte arrays.
+     */
+    public static int compareUnsigned(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
+    {
+        return BestHolder.BEST.compare(b1, s1, l1, b2, s2, l2);
+    }
+
+    public static int compareUnsigned(ByteBuffer b1, byte[] b2, int s2, int l2)
+    {
+        return BestHolder.BEST.compare(b1, b2, s2, l2);
+    }
+
+    public static int compareUnsigned(byte[] b1, int s1, int l1, ByteBuffer b2)
+    {
+        return -BestHolder.BEST.compare(b2, b1, s1, l1);
+    }
+
+    public static int compareUnsigned(ByteBuffer b1, ByteBuffer b2)
+    {
+        if (b1 == b2)
+            return 0;
+        return BestHolder.BEST.compare(b1, b2);
+    }
+
+    public static void copy(ByteBuffer src, int srcPosition, byte[] trg, int trgPosition, int length)
+    {
+        BestHolder.BEST.copy(src, srcPosition, trg, trgPosition, length);
+    }
+
+    public static void copy(ByteBuffer src, int srcPosition, ByteBuffer trg, int trgPosition, int length)
+    {
+        BestHolder.BEST.copy(src, srcPosition, trg, trgPosition, length);
+    }
+
+    public interface ByteOperations
+    {
+        abstract public int compare(byte[] buffer1, int offset1, int length1,
+                                    byte[] buffer2, int offset2, int length2);
+
+        abstract public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2);
+
+        abstract public int compare(ByteBuffer buffer1, ByteBuffer buffer2);
+
+        abstract public void copy(ByteBuffer src, int srcPosition, byte[] trg, int trgPosition, int length);
+
+        abstract public void copy(ByteBuffer src, int srcPosition, ByteBuffer trg, int trgPosition, int length);
+    }
+
+    /**
+     * Provides a lexicographical comparer implementation; either a Java
+     * implementation or a faster implementation based on {@link Unsafe}.
+     * <p/>
+     * <p>Uses reflection to gracefully fall back to the Java implementation if
+     * {@code Unsafe} isn't available.
+     */
+    private static class BestHolder
+    {
+        static final String UNSAFE_COMPARER_NAME = FastByteOperations.class.getName() + "$UnsafeOperations";
+        static final ByteOperations BEST = getBest();
+
+        /**
+         * Returns the Unsafe-using Comparer, or falls back to the pure-Java
+         * implementation if unable to do so.
+         */
+        static ByteOperations getBest()
+        {
+            String arch = System.getProperty("os.arch");
+            boolean unaligned = arch.equals("i386") || arch.equals("x86")
+                                || arch.equals("amd64") || arch.equals("x86_64");
+            if (!unaligned)
+                return new PureJavaOperations();
+            try
+            {
+                Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
+
+                // yes, UnsafeComparer does implement Comparer<byte[]>
+                @SuppressWarnings("unchecked")
+                ByteOperations comparer = (ByteOperations) theClass.getConstructor().newInstance();
+                return comparer;
+            }
+            catch (Throwable t)
+            { // ensure we really catch *everything*
+                return new PureJavaOperations();
+            }
+        }
+
+    }
+
+    @SuppressWarnings("unused") // used via reflection
+    public static final class UnsafeOperations implements ByteOperations
+    {
+        static final Unsafe theUnsafe;
+        /**
+         * The offset to the first element in a byte array.
+         */
+        static final long BYTE_ARRAY_BASE_OFFSET;
+        static final long DIRECT_BUFFER_ADDRESS_OFFSET;
+
+        static
+        {
+            theUnsafe = (Unsafe) AccessController.doPrivileged(
+                      new PrivilegedAction<Object>()
+                      {
+                          @Override
+                          public Object run()
+                          {
+                              try
+                              {
+                                  Field f = Unsafe.class.getDeclaredField("theUnsafe");
+                                  f.setAccessible(true);
+                                  return f.get(null);
+                              }
+                              catch (NoSuchFieldException e)
+                              {
+                                  // It doesn't matter what we throw;
+                                  // it's swallowed in getBest().
+                                  throw new Error();
+                              }
+                              catch (IllegalAccessException e)
+                              {
+                                  throw new Error();
+                              }
+                          }
+                      });
+
+            try
+            {
+                BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+                DIRECT_BUFFER_ADDRESS_OFFSET = theUnsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
+            }
+            catch (Exception e)
+            {
+                throw new AssertionError(e);
+            }
+
+            // sanity check - this should never fail
+            if (theUnsafe.arrayIndexScale(byte[].class) != 1)
+            {
+                throw new AssertionError();
+            }
+        }
+
+        static final boolean littleEndian =
+            ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+        /**
+         * Returns true if x1 is less than x2, when both values are treated as
+         * unsigned.
+         */
+        static boolean lessThanUnsigned(long x1, long x2)
+        {
+            return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
+        }
+
+        public int compare(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2, int length2)
+        {
+            return compareTo(buffer1, BYTE_ARRAY_BASE_OFFSET + offset1, length1, buffer2, BYTE_ARRAY_BASE_OFFSET + offset2, length2);
+        }
+
+        public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2)
+        {
+            final Object obj1;
+            final long offset1;
+            final int length1;
+            if (buffer1.hasArray())
+            {
+                obj1 = buffer1.array();
+                offset1 = BYTE_ARRAY_BASE_OFFSET + buffer1.arrayOffset() + buffer1.position();
+                length1 = buffer1.remaining();
+            }
+            else
+            {
+                obj1 = null;
+                offset1 = theUnsafe.getLong(buffer1, DIRECT_BUFFER_ADDRESS_OFFSET) + buffer1.position();
+                length1 = buffer1.remaining();
+            }
+            return compareTo(obj1, offset1, length1, buffer2, BYTE_ARRAY_BASE_OFFSET + offset2, length2);
+        }
+
+        public int compare(ByteBuffer buffer1, ByteBuffer buffer2)
+        {
+            final Object obj1, obj2;
+            final long offset1, offset2;
+            final int length1, length2;
+            if (buffer1.hasArray())
+            {
+                obj1 = buffer1.array();
+                offset1 = BYTE_ARRAY_BASE_OFFSET + buffer1.arrayOffset() + buffer1.position();
+                length1 = buffer1.remaining();
+            }
+            else
+            {
+                obj1 = null;
+                offset1 = theUnsafe.getLong(buffer1, DIRECT_BUFFER_ADDRESS_OFFSET) + buffer1.position();
+                length1 = buffer1.remaining();
+            }
+            if (buffer2.hasArray())
+            {
+                obj2 = buffer2.array();
+                offset2 = BYTE_ARRAY_BASE_OFFSET + buffer2.arrayOffset() + buffer2.position();
+                length2 = buffer2.remaining();
+            }
+            else
+            {
+                obj2 = null;
+                offset2 = theUnsafe.getLong(buffer2, DIRECT_BUFFER_ADDRESS_OFFSET) + buffer2.position();
+                length2 = buffer2.remaining();
+            }
+            return compareTo(obj1, offset1, length1, obj2, offset2, length2);
+        }
+
+        public void copy(ByteBuffer src, int srcPosition, byte[] trg, int trgPosition, int length)
+        {
+            if (src.hasArray())
+            {
+                System.arraycopy(src.array(), src.arrayOffset() + srcPosition, trg, trgPosition, length);
+                return;
+            }
+            long srcOffset = srcPosition + theUnsafe.getLong(src, DIRECT_BUFFER_ADDRESS_OFFSET);
+            copy(null, srcOffset, trg, BYTE_ARRAY_BASE_OFFSET + trgPosition, length);
+        }
+
+        public void copy(ByteBuffer srcBuf, int srcPosition, ByteBuffer trgBuf, int trgPosition, int length)
+        {
+            if (srcBuf.hasArray() && trgBuf.hasArray())
+            {
+                System.arraycopy(srcBuf.array(), srcBuf.arrayOffset() + srcPosition, trgBuf.array(), trgBuf.arrayOffset() + trgPosition, length);
+                return;
+            }
+            Object src, trg;
+            long srcOffset, trgOffset;
+            if (srcBuf.isDirect())
+            {
+                srcOffset = srcPosition + theUnsafe.getLong(srcBuf, DIRECT_BUFFER_ADDRESS_OFFSET);
+                src = null;
+            }
+            else
+            {
+                src = srcBuf.array();
+                srcOffset = BYTE_ARRAY_BASE_OFFSET + srcBuf.arrayOffset() + srcPosition;
+            }
+            if (trgBuf.isDirect())
+            {
+                trgOffset = trgPosition + theUnsafe.getLong(trgBuf, DIRECT_BUFFER_ADDRESS_OFFSET);
+                trg = null;
+            }
+            else
+            {
+                trg = trgBuf.array();
+                trgOffset = BYTE_ARRAY_BASE_OFFSET + trgBuf.arrayOffset() + trgPosition;
+            }
+            copy(src, srcOffset, trg, trgOffset, length);
+        }
+
+        // 1M, copied from java.nio.Bits (unfortunately a package-private class)
+        private static final long UNSAFE_COPY_THRESHOLD = 1 << 20;
+
+        static void copy(Object src, long srcOffset, Object dst, long dstOffset, long length)
+        {
+            while (length > 0) {
+                long size = (length > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : length;
+                // if src or dst are null, the offsets are absolute base addresses:
+                theUnsafe.copyMemory(src, srcOffset, dst, dstOffset, size);
+                length -= size;
+                srcOffset += size;
+                dstOffset += size;
+            }
+        }
+
+
+        /**
+         * Lexicographically compare two arrays.
+         *
+         * @param buffer1 left operand: a byte[] or null
+         * @param buffer2 right operand: a byte[] or null
+         * @param memoryOffset1 Where to start comparing in the left buffer (pure memory address if buffer1 is null, or relative otherwise)
+         * @param memoryOffset2 Where to start comparing in the right buffer (pure memory address if buffer1 is null, or relative otherwise)
+         * @param length1 How much to compare from the left buffer
+         * @param length2 How much to compare from the right buffer
+         * @return 0 if equal, < 0 if left is less than right, etc.
+         */
+        public int compareTo(Object buffer1, long memoryOffset1, int length1,
+                             Object buffer2, long memoryOffset2, int length2)
+        {
+            // Short circuit equal case
+            if (buffer1 == buffer2 && memoryOffset1 == memoryOffset2 && length1 == length2)
+                return 0;
+
+            int minLength = Math.min(length1, length2);
+            int minWords = minLength / Longs.BYTES;
+
+            /*
+             * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+             * time is no slower than comparing 4 bytes at a time even on 32-bit.
+             * On the other hand, it is substantially faster on 64-bit.
+             */
+
+            int wordComparisons = minWords * Longs.BYTES;
+            for (int i = 0; i < wordComparisons ; i += Longs.BYTES)
+            {
+                long lw = theUnsafe.getLong(buffer1, memoryOffset1 + (long) i);
+                long rw = theUnsafe.getLong(buffer2, memoryOffset2 + (long) i);
+                long diff = lw ^ rw;
+
+                if (diff != 0)
+                {
+                    if (!littleEndian)
+                        return lessThanUnsigned(lw, rw) ? -1 : 1;
+
+                    // Use binary search
+                    int n = 0;
+                    int y;
+                    int x = (int) diff;
+                    if (x == 0)
+                    {
+                        x = (int) (diff >>> 32);
+                        n = 32;
+                    }
+
+                    y = x << 16;
+                    if (y == 0)
+                        n += 16;
+                    else
+                        x = y;
+
+                    y = x << 8;
+                    if (y == 0)
+                        n += 8;
+                    return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+                }
+            }
+
+            // The epilogue to cover the last (minLength % 8) elements.
+            for (int i = minWords * Longs.BYTES; i < minLength; i++)
+            {
+                int result = UnsignedBytes.compare(theUnsafe.getByte(buffer1, memoryOffset1 + i),
+                                                   theUnsafe.getByte(buffer2, memoryOffset2 + i));
+                if (result != 0)
+                    return result;
+            }
+            return length1 - length2;
+        }
+
+    }
+
+    @SuppressWarnings("unused")
+    public static final class PureJavaOperations implements ByteOperations
+    {
+        @Override
+        public int compare(byte[] buffer1, int offset1, int length1,
+                           byte[] buffer2, int offset2, int length2)
+        {
+            // Short circuit equal case
+            if (buffer1 == buffer2 && offset1 == offset2 && length1 == length2)
+                return 0;
+
+            int end1 = offset1 + length1;
+            int end2 = offset2 + length2;
+            for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++)
+            {
+                int a = (buffer1[i] & 0xff);
+                int b = (buffer2[j] & 0xff);
+                if (a != b)
+                {
+                    return a - b;
+                }
+            }
+            return length1 - length2;
+        }
+
+        public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2)
+        {
+            if (buffer1.hasArray())
+                return compare(buffer1.array(), buffer1.arrayOffset() + buffer1.position(), buffer1.remaining(),
+                               buffer2, offset2, length2);
+            return compare(buffer1, ByteBuffer.wrap(buffer2, offset2, length2));
+        }
+
+        public int compare(ByteBuffer buffer1, ByteBuffer buffer2)
+        {
+            int end1 = buffer1.limit();
+            int end2 = buffer2.limit();
+            for (int i = buffer1.position(), j = buffer2.position(); i < end1 && j < end2; i++, j++)
+            {
+                int a = (buffer1.get(i) & 0xff);
+                int b = (buffer2.get(j) & 0xff);
+                if (a != b)
+                {
+                    return a - b;
+                }
+            }
+            return buffer1.remaining() - buffer2.remaining();
+        }
+
+        public void copy(ByteBuffer src, int srcPosition, byte[] trg, int trgPosition, int length)
+        {
+            if (src.hasArray())
+            {
+                System.arraycopy(src.array(), src.arrayOffset() + srcPosition, trg, trgPosition, length);
+                return;
+            }
+            src = src.duplicate();
+            src.position(srcPosition);
+            src.get(trg, trgPosition, length);
+        }
+
+        public void copy(ByteBuffer src, int srcPosition, ByteBuffer trg, int trgPosition, int length)
+        {
+            if (src.hasArray() && trg.hasArray())
+            {
+                System.arraycopy(src.array(), src.arrayOffset() + srcPosition, trg.array(), trg.arrayOffset() + trgPosition, length);
+                return;
+            }
+            src = src.duplicate();
+            src.position(srcPosition).limit(srcPosition + length);
+            trg = trg.duplicate();
+            trg.position(trgPosition);
+            trg.put(src);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index ea81b11..757e8dd 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -18,9 +18,9 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.obs.IBitSet;
 import org.apache.cassandra.utils.obs.OffHeapBitSet;
 import org.apache.cassandra.utils.obs.OpenBitSet;
@@ -35,7 +35,7 @@ public class FilterFactory
     private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class);
     private static final long BITSET_EXCESS = 20;
 
-    public static void serialize(IFilter bf, DataOutput output) throws IOException
+    public static void serialize(IFilter bf, DataOutputPlus output) throws IOException
     {
         Murmur3BloomFilter.serializer.serialize((Murmur3BloomFilter) bf, output);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/IntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java b/src/java/org/apache/cassandra/utils/IntervalTree.java
index 2b81516..3755c54 100644
--- a/src/java/org/apache/cassandra/utils/IntervalTree.java
+++ b/src/java/org/apache/cassandra/utils/IntervalTree.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class IntervalTree<C, D, I extends Interval<C, D>> implements Iterable<I>
 {
@@ -91,7 +91,7 @@ public class IntervalTree<C, D, I extends Interval<C, D>> implements Iterable<I>
 
     public static <C, D, I extends Interval<C, D>> Serializer<C, D, I> serializer(ISerializer<C> pointSerializer, ISerializer<D> dataSerializer, Constructor<I> constructor)
     {
-        return new Serializer(pointSerializer, dataSerializer, constructor);
+        return new Serializer<>(pointSerializer, dataSerializer, constructor);
     }
 
     @SuppressWarnings("unchecked")
@@ -390,7 +390,7 @@ public class IntervalTree<C, D, I extends Interval<C, D>> implements Iterable<I>
             this.constructor = constructor;
         }
 
-        public void serialize(IntervalTree<C, D, I> it, DataOutput out, int version) throws IOException
+        public void serialize(IntervalTree<C, D, I> it, DataOutputPlus out, int version) throws IOException
         {
             out.writeInt(it.count);
             for (Interval<C, D> interval : it)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index ce71ec4..8e6d5c0 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
@@ -33,6 +32,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * A MerkleTree implemented as a binary tree.
@@ -78,7 +78,7 @@ public class MerkleTree implements Serializable
 
     public static class MerkleTreeSerializer implements IVersionedSerializer<MerkleTree>
     {
-        public void serialize(MerkleTree mt, DataOutput out, int version) throws IOException
+        public void serialize(MerkleTree mt, DataOutputPlus out, int version) throws IOException
         {
             out.writeByte(mt.hashdepth);
             out.writeLong(mt.maxsize);
@@ -813,7 +813,7 @@ public class MerkleTree implements Serializable
 
         private static class InnerSerializer implements IVersionedSerializer<Inner>
         {
-            public void serialize(Inner inner, DataOutput out, int version) throws IOException
+            public void serialize(Inner inner, DataOutputPlus out, int version) throws IOException
             {
                 if (inner.hash == null)
                     out.writeInt(-1);
@@ -894,7 +894,7 @@ public class MerkleTree implements Serializable
 
         private static class LeafSerializer implements IVersionedSerializer<Leaf>
         {
-            public void serialize(Leaf leaf, DataOutput out, int version) throws IOException
+            public void serialize(Leaf leaf, DataOutputPlus out, int version) throws IOException
             {
                 if (leaf.hash == null)
                 {
@@ -1035,7 +1035,7 @@ public class MerkleTree implements Serializable
 
         private static class HashableSerializer implements IVersionedSerializer<Hashable>
         {
-            public void serialize(Hashable h, DataOutput out, int version) throws IOException
+            public void serialize(Hashable h, DataOutputPlus out, int version) throws IOException
             {
                 if (h instanceof Inner)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
index da03986..1cdefd7 100644
--- a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
+++ b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.utils;
+package org.apache.cassandra.utils;
+import java.nio.ByteBuffer;
 import java.util.zip.Checksum;
 
 /**
@@ -29,7 +30,10 @@ import java.util.zip.Checksum;
  * java.util.zip.CRC32 in Java 1.6
  *
  * @see java.util.zip.CRC32
- *
 * This class is copied from hadoop-commons project.
 * (The initial patch added PureJavaCrc32 was HADOOP-6148)
 */
+ *
+ * This class is copied from hadoop-commons project.
+ * (The initial patch added PureJavaCrc32 was HADOOP-6148)
+ */
 public class PureJavaCrc32 implements Checksum {
 
   /** the current CRC value, bit-flipped */
@@ -91,7 +95,47 @@ public class PureJavaCrc32 implements Checksum {
     crc = localCrc;
   }
 
-  @Override
+    public void update(ByteBuffer b, int off, int len) {
+        int localCrc = crc;
+
+        while(len > 7) {
+            final int c0 =(b.get(off+0) ^ localCrc) & 0xff;
+            final int c1 =(b.get(off+1) ^ (localCrc >>>= 8)) & 0xff;
+            final int c2 =(b.get(off+2) ^ (localCrc >>>= 8)) & 0xff;
+            final int c3 =(b.get(off+3) ^ (localCrc >>>= 8)) & 0xff;
+            localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1])
+                       ^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]);
+
+            final int c4 = b.get(off+4) & 0xff;
+            final int c5 = b.get(off+5) & 0xff;
+            final int c6 = b.get(off+6) & 0xff;
+            final int c7 = b.get(off+7) & 0xff;
+
+            localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5])
+                        ^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]);
+
+            off += 8;
+            len -= 8;
+        }
+
+    /* loop unroll - duff's device style */
+        switch(len) {
+            case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b.get(off++)) & 0xff)];
+            default:
+        /* nothing */
+        }
+
+        // Publish crc out to object
+        crc = localCrc;
+    }
+
+    @Override
   final public void update(int b) {
     crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
   }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index daa702f..3f5a715 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 
@@ -26,6 +25,7 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * Histogram that can be constructed from streaming of data.
@@ -170,7 +170,7 @@ public class StreamingHistogram
 
     public static class StreamingHistogramSerializer implements ISerializer<StreamingHistogram>
     {
-        public void serialize(StreamingHistogram histogram, DataOutput out) throws IOException
+        public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException
         {
             out.writeInt(histogram.maxBinSize);
             Map<Double, Long> entries = histogram.getAsMap();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/src/java/org/apache/cassandra/utils/UUIDSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDSerializer.java b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
index afaed92..2aa2b4e 100644
--- a/src/java/org/apache/cassandra/utils/UUIDSerializer.java
+++ b/src/java/org/apache/cassandra/utils/UUIDSerializer.java
@@ -18,18 +18,18 @@
 package org.apache.cassandra.utils;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class UUIDSerializer implements IVersionedSerializer<UUID>
 {
     public static UUIDSerializer serializer = new UUIDSerializer();
 
-    public void serialize(UUID uuid, DataOutput out, int version) throws IOException
+    public void serialize(UUID uuid, DataOutputPlus out, int version) throws IOException
     {
         out.writeLong(uuid.getMostSignificantBits());
         out.writeLong(uuid.getLeastSignificantBits());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index e9a6513..1c97eae 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -19,13 +19,12 @@
  */
 package org.apache.cassandra;
 
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.net.MessagingService;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -54,9 +53,9 @@ public class AbstractSerializationsTester extends SchemaLoader
 
     protected <T> void testSerializedSize(T obj, IVersionedSerializer<T> serializer) throws IOException
     {
-        ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        DataOutputBuffer out = new DataOutputBuffer();
         serializer.serialize(obj, out, getVersion());
-        assert out.toByteArray().length == serializer.serializedSize(obj, getVersion());
+        assert out.getLength() == serializer.serializedSize(obj, getVersion());
     }
 
     protected static DataInputStream getInput(String name) throws IOException
@@ -66,10 +65,10 @@ public class AbstractSerializationsTester extends SchemaLoader
         return new DataInputStream(new FileInputStream(f));
     }
 
-    protected static DataOutputStream getOutput(String name) throws IOException
+    protected static DataOutputStreamAndChannel getOutput(String name) throws IOException
     {
         File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
         f.getParentFile().mkdirs();
-        return new DataOutputStream(new FileOutputStream(f));
+        return new DataOutputStreamAndChannel(new FileOutputStream(f));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 6bf9e87..b74f2c9 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
@@ -314,12 +315,11 @@ public class Util
     {
         try
         {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            DataOutputStream out = new DataOutputStream(baos);
+            DataOutputBuffer out = new DataOutputBuffer();
             DeletionTime.serializer.serialize(cf.deletionInfo().getTopLevelDeletion(), out);
             out.writeInt(cf.getColumnCount());
             new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, out).build(cf);
-            return ByteBuffer.wrap(baos.toByteArray());
+            return ByteBuffer.wrap(out.toByteArray());
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index 2cd9f81..45ce347 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.net.CallbackInfo;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -74,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
         MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
 
-        DataOutputStream out = getOutput("db.RangeSliceCommand.bin");
+        DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
         namesCmdMsg.serialize(out, getVersion());
         emptyRangeCmdMsg.serialize(out, getVersion());
         regRangeCmdMsg.serialize(out, getVersion());
@@ -109,7 +110,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
         SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
 
-        DataOutputStream out = getOutput("db.SliceByNamesReadCommand.bin");
+        DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin");
         SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -144,7 +145,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
         SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
         
-        DataOutputStream out = getOutput("db.SliceFromReadCommand.bin");
+        DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin");
         SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -177,7 +178,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testRowWrite() throws IOException
     {
-        DataOutputStream out = getOutput("db.Row.bin");
+        DataOutputStreamAndChannel out = getOutput("db.Row.bin");
         Row.serializer.serialize(statics.StandardRow, out, getVersion());
         Row.serializer.serialize(statics.SuperRow, out, getVersion());
         Row.serializer.serialize(statics.NullRow, out, getVersion());
@@ -214,7 +215,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf);
         Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods);
 
-        DataOutputStream out = getOutput("db.RowMutation.bin");
+        DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin");
         Mutation.serializer.serialize(standardRowRm, out, getVersion());
         Mutation.serializer.serialize(superRowRm, out, getVersion());
         Mutation.serializer.serialize(standardRm, out, getVersion());
@@ -263,7 +264,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter");
         TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true);
         TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false);
-        DataOutputStream out = getOutput("db.Truncation.bin");
+        DataOutputStreamAndChannel out = getOutput("db.Truncation.bin");
         Truncation.serializer.serialize(tr, out, getVersion());
         TruncateResponse.serializer.serialize(aff, out, getVersion());
         TruncateResponse.serializer.serialize(neg, out, getVersion());
@@ -305,7 +306,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         WriteResponse aff = new WriteResponse();
         WriteResponse neg = new WriteResponse();
-        DataOutputStream out = getOutput("db.WriteResponse.bin");
+        DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin");
         WriteResponse.serializer.serialize(aff, out, getVersion());
         WriteResponse.serializer.serialize(neg, out, getVersion());
         out.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 7e0008d..6317a98 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
@@ -38,7 +39,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 {
     private void testEndpointStateWrite() throws IOException
     {
-        DataOutputStream out = getOutput("gms.EndpointState.bin");
+        DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin");
         HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion());
         EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion());
         VersionedValue.serializer.serialize(Statics.vv0, out, getVersion());
@@ -75,7 +76,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         GossipDigestAck2 ack2 = new GossipDigestAck2(states);
         GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
 
-        DataOutputStream out = getOutput("gms.Gossip.bin");
+        DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin");
         for (GossipDigest gd : Statics.Digests)
             GossipDigest.serializer.serialize(gd, out, getVersion());
         GossipDigestAck.serializer.serialize(ack, out, getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 0583e95..edaa5ba 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -72,14 +73,13 @@ public class IndexSummaryTest
     public void testSerialization() throws IOException
     {
         Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1);
-        ByteArrayOutputStream aos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(aos);
+        DataOutputBuffer dos = new DataOutputBuffer();
         IndexSummary.serializer.serialize(random.right, dos, false);
         // write junk
         dos.writeUTF("JUNK");
         dos.writeUTF("JUNK");
         FileUtils.closeQuietly(dos);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
         IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false, 1, 1);
         for (int i = 0; i < 100; i++)
             assertEquals(i, is.binarySearch(random.left.get(i)));
@@ -100,10 +100,9 @@ public class IndexSummaryTest
         assertEquals(0, summary.getPosition(0));
         assertArrayEquals(new byte[0], summary.getKey(0));
 
-        ByteArrayOutputStream aos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(aos);
+        DataOutputBuffer dos = new DataOutputBuffer();
         IndexSummary.serializer.serialize(summary, dos, false);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
         IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
 
         assertEquals(1, loaded.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 6b66746..da0e31a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
@@ -70,7 +71,7 @@ public class MetadataSerializerTest
         MetadataSerializer serializer = new MetadataSerializer();
         // Serialize to tmp file
         File statsFile = File.createTempFile(Component.STATS.name, null);
-        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(statsFile)))
+        try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile)))
         {
             serializer.serialize(originalMetadata, out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
new file mode 100644
index 0000000..4eeec4d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.avro.util.ByteBufferInputStream;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class DataOutputTest
+{
+
+    @Test
+    public void testDataOutputStreamPlus() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStreamPlus write = new DataOutputStreamPlus(bos);
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testDataOutputChannelAndChannel() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStreamPlus write = new DataOutputStreamAndChannel(Channels.newChannel(bos));
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testDataOutputBuffer() throws IOException
+    {
+        DataOutputBuffer write = new DataOutputBuffer();
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(write.toByteArray()));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testDataOutputDirectByteBuffer() throws IOException
+    {
+        ByteBuffer buf = wrap(new byte[345], true);
+        DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteBufferInputStream(Arrays.asList(buf)));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testDataOutputHeapByteBuffer() throws IOException
+    {
+        ByteBuffer buf = wrap(new byte[345], false);
+        DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteBufferInputStream(Arrays.asList(buf)));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testFileOutputStream() throws IOException
+    {
+        File file = FileUtils.createTempFile("dataoutput", "test");
+        try
+        {
+            DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(new FileOutputStream(file));
+            DataInput canon = testWrite(write);
+            write.close();
+            DataInput test = new DataInputStream(new FileInputStream(file));
+            testRead(test, canon);
+        }
+        finally
+        {
+            Assert.assertTrue(file.delete());
+        }
+    }
+
+    @Test
+    public void testRandomAccessFile() throws IOException
+    {
+        File file = FileUtils.createTempFile("dataoutput", "test");
+        try
+        {
+            final RandomAccessFile raf = new RandomAccessFile(file, "rw");
+            DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(Channels.newOutputStream(raf.getChannel()), raf.getChannel());
+            DataInput canon = testWrite(write);
+            write.close();
+            DataInput test = new DataInputStream(new FileInputStream(file));
+            testRead(test, canon);
+        }
+        finally
+        {
+            Assert.assertTrue(file.delete());
+        }
+    }
+
+    @Test
+    public void testSequentialWriter() throws IOException
+    {
+        File file = FileUtils.createTempFile("dataoutput", "test");
+        final SequentialWriter writer = new SequentialWriter(file, 32, true);
+        DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
+        DataInput canon = testWrite(write);
+        write.flush();
+        write.close();
+        DataInput test = new DataInputStream(new FileInputStream(file));
+        testRead(test, canon);
+        Assert.assertTrue(file.delete());
+    }
+
+    private DataInput testWrite(DataOutputPlus test) throws IOException
+    {
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        final DataOutput canon = new DataOutputStream(bos);
+        Random rnd = ThreadLocalRandom.current();
+
+        byte[] bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithLength(bytes, test);
+        ByteBufferUtil.writeWithLength(bytes, canon);
+
+        bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithLength(wrap(bytes, false), test);
+        ByteBufferUtil.writeWithLength(bytes, canon);
+
+        bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithLength(wrap(bytes, true), test);
+        ByteBufferUtil.writeWithLength(bytes, canon);
+
+        bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithShortLength(bytes, test);
+        ByteBufferUtil.writeWithShortLength(bytes, canon);
+
+        bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithShortLength(wrap(bytes, false), test);
+        ByteBufferUtil.writeWithShortLength(bytes, canon);
+
+        bytes = new byte[50];
+        rnd.nextBytes(bytes);
+        ByteBufferUtil.writeWithShortLength(wrap(bytes, true), test);
+        ByteBufferUtil.writeWithShortLength(bytes, canon);
+        // 318
+
+        {
+            long v = rnd.nextLong();
+            test.writeLong(v);
+            canon.writeLong(v);
+        }
+        {
+            int v = rnd.nextInt();
+            test.writeInt(v);
+            canon.writeInt(v);
+        }
+        {
+            short v = (short) rnd.nextInt();
+            test.writeShort(v);
+            canon.writeShort(v);
+        }
+        {
+            byte v = (byte) rnd.nextInt();
+            test.write(v);
+            canon.write(v);
+        }
+        {
+            double v = rnd.nextDouble();
+            test.writeDouble(v);
+            canon.writeDouble(v);
+        }
+        {
+            float v = (float) rnd.nextDouble();
+            test.writeFloat(v);
+            canon.writeFloat(v);
+        }
+
+        // 27
+        return new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+    }
+
+    private void testRead(DataInput test, DataInput canon) throws IOException
+    {
+        Assert.assertEquals(ByteBufferUtil.readWithLength(canon), ByteBufferUtil.readWithLength(test));
+        Assert.assertEquals(ByteBufferUtil.readWithLength(canon), ByteBufferUtil.readWithLength(test));
+        Assert.assertEquals(ByteBufferUtil.readWithLength(canon), ByteBufferUtil.readWithLength(test));
+        Assert.assertEquals(ByteBufferUtil.readWithShortLength(canon), ByteBufferUtil.readWithShortLength(test));
+        Assert.assertEquals(ByteBufferUtil.readWithShortLength(canon), ByteBufferUtil.readWithShortLength(test));
+        Assert.assertEquals(ByteBufferUtil.readWithShortLength(canon), ByteBufferUtil.readWithShortLength(test));
+        assert test.readLong() == canon.readLong();
+        assert test.readInt() == canon.readInt();
+        assert test.readShort() == canon.readShort();
+        assert test.readByte() == canon.readByte();
+        assert test.readDouble() == canon.readDouble();
+        assert test.readFloat() == canon.readFloat();
+        try
+        {
+            test.readInt();
+            assert false;
+        }
+        catch (EOFException _)
+        {
+        }
+    }
+
+    private static ByteBuffer wrap(byte[] bytes, boolean direct)
+    {
+        ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bytes.length + 20) : ByteBuffer.allocate(bytes.length + 20);
+        buf.position(10);
+        buf.limit(bytes.length + 10);
+        buf.duplicate().put(bytes);
+        return buf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 05936e2..8da900a 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
@@ -36,6 +35,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -127,7 +127,7 @@ public class ValidatorTest extends SchemaLoader
             super(key);
         }
 
-        public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+        public RowIndexEntry write(long currentPosition, DataOutputPlus out) throws IOException
         {
             throw new UnsupportedOperationException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index f1d23f0..6937ceb 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.NodePair;
@@ -54,7 +55,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
     {
-        try (DataOutputStream out = getOutput(fileName))
+        try (DataOutputStreamAndChannel out = getOutput(fileName))
         {
             for (RepairMessage message : messages)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index f0dc602..4180a8c 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -36,6 +36,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class BloomFilterTest
@@ -162,7 +163,7 @@ public class BloomFilterTest
         File file = FileUtils.createTempFile("bloomFilterTest-", ".dat");
         BloomFilter filter = (BloomFilter) FilterFactory.getFilter(((long)Integer.MAX_VALUE / 8) + 1, 0.01d, true);
         filter.add(test);
-        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+        DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(file));
         FilterFactory.serialize(filter, out);
         filter.bitset.serialize(out);
         out.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75508ec8/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
index 172cdb1..44cb20d 100644
--- a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
+++ b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
@@ -32,6 +32,9 @@ import java.util.Arrays;
 
 import org.junit.Test;
 
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+
 public class ByteBufferUtilTest
 {
     private static final String s = "cassandra";
@@ -171,12 +174,11 @@ public class ByteBufferUtilTest
 
     private void checkReadWrite(ByteBuffer bb) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(bos);
+        DataOutputBuffer out = new DataOutputBuffer();
         ByteBufferUtil.writeWithLength(bb, out);
         ByteBufferUtil.writeWithShortLength(bb, out);
 
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
         assert bb.equals(ByteBufferUtil.readWithLength(in));
         assert bb.equals(ByteBufferUtil.readWithShortLength(in));
     }


Mime
View raw message