cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [08/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)
Date Fri, 04 Sep 2015 11:46:29 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
deleted file mode 100644
index 29ce2c3..0000000
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.*;
-
-public abstract class AbstractDataInput extends InputStream implements DataInputPlus
-{
-    public abstract void seek(long position) throws IOException;
-    public abstract long getPosition();
-    public abstract long getPositionLimit();
-
-    public int skipBytes(int n) throws IOException
-    {
-        if (n <= 0)
-            return 0;
-        long oldPosition = getPosition();
-        seek(Math.min(getPositionLimit(), oldPosition + n));
-        long skipped = getPosition() - oldPosition;
-        assert skipped >= 0 && skipped <= n;
-        return (int) skipped;
-    }
-
-    /**
-     * Reads a boolean from the current position in this file. Blocks until one
-     * byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next boolean value from this file.
-     * @throws java.io.EOFException
-     *             if the end of this file is detected.
-     * @throws java.io.IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final boolean readBoolean() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp != 0;
-    }
-
-    /**
-     * Reads an 8-bit byte from the current position in this file. Blocks until
-     * one byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next signed 8-bit byte value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final byte readByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return (byte) temp;
-    }
-
-    /**
-     * Reads a 16-bit character from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next char value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final char readChar() throws IOException {
-        int ch1 = this.read();
-        int ch2 = this.read();
-        if ((ch1 | ch2) < 0)
-            throw new EOFException();
-        return (char)((ch1 << 8) + (ch2 << 0));
-    }
-
-    /**
-     * Reads a 64-bit double from the current position in this file. Blocks
-     * until eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next double value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final double readDouble() throws IOException {
-        return Double.longBitsToDouble(readLong());
-    }
-
-    /**
-     * Reads a 32-bit float from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next float value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final float readFloat() throws IOException {
-        return Float.intBitsToFloat(readInt());
-    }
-
-    /**
-     * Reads bytes from this file into {@code buffer}. Blocks until {@code
-     * buffer.length} number of bytes have been read, the end of the file is
-     * reached or an exception is thrown.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public void readFully(byte[] buffer) throws IOException
-    {
-        readFully(buffer, 0, buffer.length);
-    }
-
-    /**
-     * Read bytes from this file into {@code buffer} starting at offset {@code
-     * offset}. This method blocks until {@code count} number of bytes have been
-     * read.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @param offset
-     *            the initial position in {@code buffer} to store the bytes read
-     *            from this file.
-     * @param count
-     *            the maximum number of bytes to store in {@code buffer}.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IndexOutOfBoundsException
-     *             if {@code offset < 0} or {@code count < 0}, or if {@code
-     *             offset + count} is greater than the length of {@code buffer}.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public void readFully(byte[] buffer, int offset, int count) throws IOException
-    {
-        if (buffer == null) {
-            throw new NullPointerException();
-        }
-        // avoid int overflow
-        if (offset < 0 || offset > buffer.length || count < 0
-                || count > buffer.length - offset) {
-            throw new IndexOutOfBoundsException();
-        }
-        while (count > 0) {
-            int result = read(buffer, offset, count);
-            if (result < 0) {
-                throw new EOFException();
-            }
-            offset += result;
-            count -= result;
-        }
-    }
-
-    /**
-     * Reads a 32-bit integer from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next int value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public int readInt() throws IOException {
-        int ch1 = this.read();
-        int ch2 = this.read();
-        int ch3 = this.read();
-        int ch4 = this.read();
-        if ((ch1 | ch2 | ch3 | ch4) < 0)
-            throw new EOFException();
-        return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-    }
-
-    /**
-     * Reads a line of text form the current position in this file. A line is
-     * represented by zero or more characters followed by {@code '\n'}, {@code
-     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
-     * include the line terminating sequence.
-     * <p>
-     * Blocks until a line terminating sequence has been read, the end of the
-     * file is reached or an exception is thrown.
-     *
-     * @return the contents of the line or {@code null} if no characters have
-     *         been read before the end of the file has been reached.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final String readLine() throws IOException {
-        StringBuilder line = new StringBuilder(80); // Typical line length
-        boolean foundTerminator = false;
-        long unreadPosition = -1;
-        while (true) {
-            int nextByte = read();
-            switch (nextByte) {
-                case -1:
-                    return line.length() != 0 ? line.toString() : null;
-                case (byte) '\r':
-                    if (foundTerminator) {
-                        seek(unreadPosition);
-                        return line.toString();
-                    }
-                    foundTerminator = true;
-                    /* Have to be able to peek ahead one byte */
-                    unreadPosition = getPosition();
-                    break;
-                case (byte) '\n':
-                    return line.toString();
-                default:
-                    if (foundTerminator) {
-                        seek(unreadPosition);
-                        return line.toString();
-                    }
-                    line.append((char) nextByte);
-            }
-        }
-    }
-
-    /**
-     * Reads a 64-bit long from the current position in this file. Blocks until
-     * eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next long value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public long readLong() throws IOException {
-        return ((long)(readInt()) << 32) + (readInt() & 0xFFFFFFFFL);
-    }
-
-    /**
-     * Reads a 16-bit short from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception
-     * is thrown.
-     *
-     * @return the next short value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public short readShort() throws IOException {
-        int ch1 = this.read();
-        int ch2 = this.read();
-        if ((ch1 | ch2) < 0)
-            throw new EOFException();
-        return (short)((ch1 << 8) + (ch2 << 0));
-    }
-
-    /**
-     * Reads an unsigned 8-bit byte from the current position in this file and
-     * returns it as an integer. Blocks until one byte has been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned byte value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readUnsignedByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp;
-    }
-
-    /**
-     * Reads an unsigned 16-bit short from the current position in this file and
-     * returns it as an integer. Blocks until two bytes have been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned short value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public int readUnsignedShort() throws IOException {
-        int ch1 = this.read();
-        int ch2 = this.read();
-        if ((ch1 | ch2) < 0)
-            throw new EOFException();
-        return (ch1 << 8) + (ch2 << 0);
-    }
-
-    /**
-     * Reads a string that is encoded in {@link java.io.DataInput modified UTF-8} from
-     * this file. The number of bytes that must be read for the complete string
-     * is determined by the first two bytes read from the file. Blocks until all
-     * required bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next string encoded in {@link java.io.DataInput modified UTF-8} from
-     *         this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws java.io.UTFDataFormatException
-     *             if the bytes read cannot be decoded into a character string.
-     */
-    public final String readUTF() throws IOException {
-        return DataInputStream.readUTF(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 744e828..090c5bd 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -29,25 +29,8 @@ public class BufferedSegmentedFile extends SegmentedFile
         super(copy);
     }
 
-    private static class Cleanup extends SegmentedFile.Cleanup
-    {
-        protected Cleanup(ChannelProxy channel)
-        {
-            super(channel);
-        }
-        public void tidy()
-        {
-            super.tidy();
-        }
-    }
-
     public static class Builder extends SegmentedFile.Builder
     {
-        public void addPotentialBoundary(long boundary)
-        {
-            // only one segment in a standard-io file
-        }
-
         public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
         {
             long length = overrideLength > 0 ? overrideLength : channel.size();
@@ -55,13 +38,6 @@ public class BufferedSegmentedFile extends SegmentedFile
         }
     }
 
-    public FileDataInput getSegment(long position)
-    {
-        RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L);
-        reader.seek(position);
-        return reader;
-    }
-
     public BufferedSegmentedFile sharedCopy()
     {
         return new BufferedSegmentedFile(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
deleted file mode 100644
index bf926e9..0000000
--- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput
-{
-    private final ByteBuffer buffer;
-    private final String filename;
-    private final long segmentOffset;
-    private int position;
-
-    public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int position)
-    {
-        assert buffer != null;
-        this.buffer = buffer;
-        this.filename = filename;
-        this.segmentOffset = segmentOffset;
-        this.position = position;
-    }
-
-    // Only use when we know the seek in within the mapped segment. Throws an
-    // IOException otherwise.
-    public void seek(long pos) throws IOException
-    {
-        long inSegmentPos = pos - segmentOffset;
-        if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
-            throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
-
-        position = (int) inSegmentPos;
-    }
-
-    public long getFilePointer()
-    {
-        return segmentOffset + position;
-    }
-
-    public long getPosition()
-    {
-        return segmentOffset + position;
-    }
-
-    public long getPositionLimit()
-    {
-        return segmentOffset + buffer.capacity();
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return false;
-    }
-
-    public void reset(FileMark mark) throws IOException
-    {
-        assert mark instanceof MappedFileDataInputMark;
-        position = ((MappedFileDataInputMark) mark).position;
-    }
-
-    public FileMark mark()
-    {
-        return new MappedFileDataInputMark(position);
-    }
-
-    public long bytesPastMark(FileMark mark)
-    {
-        assert mark instanceof MappedFileDataInputMark;
-        assert position >= ((MappedFileDataInputMark) mark).position;
-        return position - ((MappedFileDataInputMark) mark).position;
-    }
-
-    public boolean isEOF() throws IOException
-    {
-        return position == buffer.capacity();
-    }
-
-    public long bytesRemaining() throws IOException
-    {
-        return buffer.capacity() - position;
-    }
-
-    public String getPath()
-    {
-        return filename;
-    }
-
-    public int read() throws IOException
-    {
-        if (isEOF())
-            return -1;
-        return buffer.get(position++) & 0xFF;
-    }
-
-    /**
-     * Does the same thing as <code>readFully</code> do but without copying data (thread safe)
-     * @param length length of the bytes to read
-     * @return buffer with portion of file content
-     * @throws IOException on any fail of I/O operation
-     */
-    public ByteBuffer readBytes(int length) throws IOException
-    {
-        int remaining = buffer.remaining() - position;
-        if (length > remaining)
-            throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested",
-                                                remaining, length));
-
-        if (length == 0)
-            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        ByteBuffer bytes = buffer.duplicate();
-        bytes.position(buffer.position() + position).limit(buffer.position() + position + length);
-        position += length;
-
-        // we have to copy the data in case we unreference the underlying sstable.  See CASSANDRA-3179
-        ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
-        clone.put(bytes);
-        clone.flip();
-        return clone;
-    }
-
-    @Override
-    public final void readFully(byte[] bytes) throws IOException
-    {
-        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length);
-        position += bytes.length;
-    }
-
-    @Override
-    public final void readFully(byte[] bytes, int offset, int count) throws IOException
-    {
-        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count);
-        position += count;
-    }
-
-    private static class MappedFileDataInputMark implements FileMark
-    {
-        int position;
-
-        MappedFileDataInputMark(int position)
-        {
-            this.position = position;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "(" +
-               "filename='" + filename + "'" +
-               ", position=" + position +
-               ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ChannelProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
index 79954a5..f866160 100644
--- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java
+++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
@@ -63,7 +63,7 @@ public final class ChannelProxy extends SharedCloseableImpl
 
     public ChannelProxy(File file)
     {
-        this(file.getAbsolutePath(), openChannel(file));
+        this(file.getPath(), openChannel(file));
     }
 
     public ChannelProxy(String filePath, FileChannel channel)
@@ -87,7 +87,7 @@ public final class ChannelProxy extends SharedCloseableImpl
         final String filePath;
         final FileChannel channel;
 
-        protected Cleanup(String filePath, FileChannel channel)
+        Cleanup(String filePath, FileChannel channel)
         {
             this.filePath = filePath;
             this.channel = channel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 98869a1..30f1e0c 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -23,43 +23,33 @@ import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Throwables;
 
 public class ChecksummedRandomAccessReader extends RandomAccessReader
 {
     @SuppressWarnings("serial")
     public static class CorruptFileException extends RuntimeException
     {
-        public final File file;
+        public final String filePath;
 
-        public CorruptFileException(Exception cause, File file)
+        public CorruptFileException(Exception cause, String filePath)
         {
             super(cause);
-            this.file = file;
+            this.filePath = filePath;
         }
     }
 
     private final DataIntegrityMetadata.ChecksumValidator validator;
-    private final File file;
 
-    protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator)
+    private ChecksummedRandomAccessReader(Builder builder)
     {
-        super(channel, validator.chunkSize, -1, BufferType.ON_HEAP);
-        this.validator = validator;
-        this.file = file;
+        super(builder);
+        this.validator = builder.validator;
     }
 
     @SuppressWarnings("resource")
-    public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
-    {
-        ChannelProxy channel = new ChannelProxy(file);
-        RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
-        DataIntegrityMetadata.ChecksumValidator validator =
-            new DataIntegrityMetadata.ChecksumValidator(new CRC32(), crcReader, file.getPath());
-        return new ChecksummedRandomAccessReader(file, channel, validator);
-    }
-
     @Override
-    protected void reBuffer()
+    protected void reBufferStandard()
     {
         long desiredPosition = current();
         // align with buffer size, as checksums were computed in chunks of buffer size each.
@@ -84,13 +74,19 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
         }
         catch (IOException e)
         {
-            throw new CorruptFileException(e, file);
+            throw new CorruptFileException(e, channel.filePath());
         }
 
         buffer.position((int) (desiredPosition - bufferOffset));
     }
 
     @Override
+    protected void reBufferMmap()
+    {
+        throw new AssertionError("Unsupported operation");
+    }
+
+    @Override
     public void seek(long newPosition)
     {
         validator.seek(newPosition);
@@ -100,14 +96,32 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
     @Override
     public void close()
     {
-        try
+        Throwables.perform(channel.filePath(), Throwables.FileOpType.READ,
+                           super::close,
+                           validator::close,
+                           channel::close);
+    }
+
+    public static final class Builder extends RandomAccessReader.Builder
+    {
+        private final DataIntegrityMetadata.ChecksumValidator validator;
+
+        @SuppressWarnings("resource")
+        public Builder(File file, File crcFile) throws IOException
         {
-            super.close();
+            super(new ChannelProxy(file));
+            this.validator = new DataIntegrityMetadata.ChecksumValidator(new CRC32(),
+                                                                         RandomAccessReader.open(crcFile),
+                                                                         file.getPath());
+
+            super.bufferSize(validator.chunkSize)
+                 .bufferType(BufferType.ON_HEAP);
         }
-        finally
+
+        @Override
+        public RandomAccessReader build()
         {
-            channel.close();
-            validator.close();
+            return new ChecksummedRandomAccessReader(this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 95c61c1..16f791a 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,44 +17,49 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.TreeMap;
-
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
 {
-    public final CompressionMetadata metadata;
+    private static final Logger logger = LoggerFactory.getLogger(CompressedSegmentedFile.class);
     private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
-    private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
-    private final TreeMap<Long, MappedByteBuffer> chunkSegments;
+
+    public final CompressionMetadata metadata;
+    private final MmappedRegions regions;
 
     public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata)
     {
-        this(channel, bufferSize, metadata, createMappedSegments(channel, metadata));
+        this(channel,
+             bufferSize,
+             metadata,
+             useMmap
+             ? MmappedRegions.map(channel, metadata)
+             : null);
     }
 
-    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
+    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, MmappedRegions regions)
     {
-        super(new Cleanup(channel, metadata, chunkSegments), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength);
+        super(new Cleanup(channel, metadata, regions), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength);
         this.metadata = metadata;
-        this.chunkSegments = chunkSegments;
+        this.regions = regions;
     }
 
     private CompressedSegmentedFile(CompressedSegmentedFile copy)
     {
         super(copy);
         this.metadata = copy.metadata;
-        this.chunkSegments = copy.chunkSegments;
+        this.regions = copy.regions;
     }
 
     public ChannelProxy channel()
@@ -62,60 +67,36 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
         return channel;
     }
 
-    public TreeMap<Long, MappedByteBuffer> chunkSegments()
+    public MmappedRegions regions()
     {
-        return chunkSegments;
-    }
-
-    static TreeMap<Long, MappedByteBuffer> createMappedSegments(ChannelProxy channel, CompressionMetadata metadata)
-    {
-        if (!useMmap)
-            return null;
-        TreeMap<Long, MappedByteBuffer> chunkSegments = new TreeMap<>();
-        long offset = 0;
-        long lastSegmentOffset = 0;
-        long segmentSize = 0;
-
-        while (offset < metadata.dataLength)
-        {
-            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
-
-            //Reached a new mmap boundary
-            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
-            {
-                chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
-                lastSegmentOffset += segmentSize;
-                segmentSize = 0;
-            }
-
-            segmentSize += chunk.length + 4; //checksum
-            offset += metadata.chunkLength();
-        }
-
-        if (segmentSize > 0)
-            chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
-        return chunkSegments;
+        return regions;
     }
 
     private static final class Cleanup extends SegmentedFile.Cleanup
     {
         final CompressionMetadata metadata;
-        final TreeMap<Long, MappedByteBuffer> chunkSegments;
-        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
+        private final MmappedRegions regions;
+
+        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
         {
             super(channel);
             this.metadata = metadata;
-            this.chunkSegments = chunkSegments;
+            this.regions = regions;
         }
         public void tidy()
         {
-            super.tidy();
-            metadata.close();
-            if (chunkSegments != null)
+            Throwable err = regions == null ? null : regions.close(null);
+            if (err != null)
             {
-                for (MappedByteBuffer segment : chunkSegments.values())
-                    FileUtils.clean(segment);
+                JVMStabilityInspector.inspectThrowable(err);
+
+                // This is not supposed to happen
+                logger.error("Error while closing mmapped regions", err);
             }
+
+            metadata.close();
+
+            super.tidy();
         }
     }
 
@@ -132,17 +113,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
 
     public static class Builder extends SegmentedFile.Builder
     {
-        protected final CompressedSequentialWriter writer;
+        final CompressedSequentialWriter writer;
         public Builder(CompressedSequentialWriter writer)
         {
             this.writer = writer;
         }
 
-        public void addPotentialBoundary(long boundary)
-        {
-            // only one segment in a standard-io file
-        }
-
         protected CompressionMetadata metadata(String path, long overrideLength)
         {
             if (writer == null)
@@ -166,12 +142,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
 
     public RandomAccessReader createReader()
     {
-        return CompressedRandomAccessReader.open(this);
+        return new CompressedRandomAccessReader.Builder(this).build();
     }
 
-    public RandomAccessReader createThrottledReader(RateLimiter limiter)
+    public RandomAccessReader createReader(RateLimiter limiter)
     {
-        return CompressedThrottledReader.open(this, limiter);
+        return new CompressedRandomAccessReader.Builder(this).limiter(limiter).build();
     }
 
     public CompressionMetadata getMetadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
index 63091d0..a68dcc2 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -21,13 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * Input stream around a fixed ByteBuffer. Necessary to have this derived class to avoid NIODataInputStream's
- * shuffling of bytes behavior in readNext()
- *
+ * Input stream around a single ByteBuffer.
  */
-public class DataInputBuffer extends NIODataInputStream
+public class DataInputBuffer extends RebufferingInputStream
 {
-
     private static ByteBuffer slice(byte[] buffer, int offset, int length)
     {
         ByteBuffer buf = ByteBuffer.wrap(buffer);
@@ -41,13 +38,12 @@ public class DataInputBuffer extends NIODataInputStream
     }
 
     /**
-     *
-     * @param buf
+     * @param buffer
      * @param duplicate Whether or not to duplicate the buffer to ensure thread safety
      */
-    public DataInputBuffer(ByteBuffer buf, boolean duplicate)
+    public DataInputBuffer(ByteBuffer buffer, boolean duplicate)
     {
-        super(buf, duplicate);
+        super(duplicate ? buffer.duplicate() : buffer);
     }
 
     public DataInputBuffer(byte[] buffer, int offset, int length)
@@ -61,8 +57,14 @@ public class DataInputBuffer extends NIODataInputStream
     }
 
     @Override
-    protected int readNext() throws IOException
+    protected void reBuffer() throws IOException
+    {
+        //nope, we don't rebuffer, we are done!
+    }
+
+    @Override
+    public int available() throws IOException
     {
-        return -1;
+        return buffer.remaining();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 70cd860..01c0049 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -34,6 +34,7 @@ import com.google.common.base.Charsets;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.Throwables;
 
 public class DataIntegrityMetadata
 {
@@ -138,14 +139,8 @@ public class DataIntegrityMetadata
 
         public void close()
         {
-            try
-            {
-                this.digestReader.close();
-            }
-            finally
-            {
-                this.dataReader.close();
-            }
+            Throwables.perform(digestReader::close,
+                               dataReader::close);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index b63b750..f56193b 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -19,31 +19,22 @@ package org.apache.cassandra.io.util;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 public interface FileDataInput extends DataInputPlus, Closeable
 {
-    public String getPath();
+    String getPath();
 
-    public boolean isEOF() throws IOException;
+    boolean isEOF() throws IOException;
 
-    public long bytesRemaining() throws IOException;
+    long bytesRemaining() throws IOException;
 
-    public void seek(long pos) throws IOException;
+    void seek(long pos) throws IOException;
 
-    public FileMark mark();
+    FileMark mark();
 
-    public void reset(FileMark mark) throws IOException;
+    void reset(FileMark mark) throws IOException;
 
-    public long bytesPastMark(FileMark mark);
+    long bytesPastMark(FileMark mark);
 
-    public long getFilePointer();
-
-    /**
-     * Read length bytes from current file position
-     * @param length length of the bytes to read
-     * @return buffer with bytes read
-     * @throws IOException if any I/O operation failed
-     */
-    public ByteBuffer readBytes(int length) throws IOException;
+    long getFilePointer();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
new file mode 100644
index 0000000..425c7d6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is the same as DataInputBuffer, i.e. a stream for a fixed byte buffer,
+ * except that we also implement FileDataInput by using an offset and a file path.
+ */
+public class FileSegmentInputStream extends DataInputBuffer implements FileDataInput
+{
+    private final String filePath;
+    private final long offset;
+
+    public FileSegmentInputStream(ByteBuffer buffer, String filePath, long offset)
+    {
+        super(buffer, false);
+        this.filePath = filePath;
+        this.offset = offset;
+    }
+
+    public String getPath()
+    {
+        return filePath;
+    }
+
+    private long size()
+    {
+        return offset + buffer.capacity();
+    }
+
+    public boolean isEOF()
+    {
+        return !buffer.hasRemaining();
+    }
+
+    public long bytesRemaining()
+    {
+        return buffer.remaining();
+    }
+
+    public void seek(long pos)
+    {
+        if (pos < 0 || pos > size())
+            throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in partial mode",
+                                                             pos,
+                                                             getPath(),
+                                                             size()));
+
+
+        buffer.position((int) (pos - offset));
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+
+    public FileMark mark()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void reset(FileMark mark)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public long bytesPastMark(FileMark mark)
+    {
+        return 0;
+    }
+
+    public long getFilePointer()
+    {
+        return offset + buffer.position();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index ce7b22c..43d37dc 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -17,14 +17,11 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.nio.MappedByteBuffer;
-import java.util.TreeMap;
-
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public interface ICompressedFile
 {
-    public ChannelProxy channel();
-    public CompressionMetadata getMetadata();
-    public TreeMap<Long, MappedByteBuffer> chunkSegments();
+    ChannelProxy channel();
+    CompressionMetadata getMetadata();
+    MmappedRegions regions();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
index 45261e0..e009528 100644
--- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
@@ -19,50 +19,58 @@ package org.apache.cassandra.io.util;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
-public class MemoryInputStream extends AbstractDataInput implements DataInput
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+public class MemoryInputStream extends RebufferingInputStream implements DataInput
 {
     private final Memory mem;
-    private int position = 0;
+    private final int bufferSize;
+    private long offset;
 
-    public MemoryInputStream(Memory mem)
-    {
-        this.mem = mem;
-    }
 
-    public int read() throws IOException
+    public MemoryInputStream(Memory mem)
     {
-        return mem.getByte(position++) & 0xFF;
+        this(mem, Ints.saturatedCast(mem.size));
     }
 
-    public void readFully(byte[] buffer, int offset, int count) throws IOException
+    @VisibleForTesting
+    public MemoryInputStream(Memory mem, int bufferSize)
     {
-        mem.getBytes(position, buffer, offset, count);
-        position += count;
+        super(getByteBuffer(mem.peer, bufferSize));
+        this.mem = mem;
+        this.bufferSize = bufferSize;
+        this.offset = mem.peer + bufferSize;
     }
 
-    public void seek(long pos)
+    @Override
+    protected void reBuffer() throws IOException
     {
-        position = (int) pos;
-    }
+        if (offset - mem.peer >= mem.size())
+            return;
 
-    public long getPosition()
-    {
-        return position;
+        buffer = getByteBuffer(offset, Math.min(bufferSize, Ints.saturatedCast(memRemaining())));
+        offset += buffer.capacity();
     }
 
-    public long getPositionLimit()
+    @Override
+    public int available()
     {
-        return mem.size();
+        return Ints.saturatedCast(buffer.remaining() + memRemaining());
     }
 
-    protected long length()
+    private long memRemaining()
     {
-        return mem.size();
+        return mem.size + mem.peer - offset;
     }
 
-    public void close()
+    private static ByteBuffer getByteBuffer(long offset, int length)
     {
-        // do nothing.
+        return MemoryUtil.getByteBuffer(offset, length).order(ByteOrder.BIG_ENDIAN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MmappedRegions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
new file mode 100644
index 0000000..8f6cd92
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
+
+import static java.util.stream.Stream.of;
+import static org.apache.cassandra.utils.Throwables.perform;
+
+public class MmappedRegions extends SharedCloseableImpl
+{
+    /** In a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size */
+    public static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+
+    /** When we need to grow the arrays, we add this number of region slots */
+    static final int REGION_ALLOC_SIZE = 15;
+
+    /** The original state, which is shared with the tidier and
+     * contains all the regions mapped so far. It also
+     * does the actual mapping. */
+    private final State state;
+
+    /** A copy of the latest state. We update this each time the original state is
+     * updated and we share this with copies. If we are a copy, then this
+     * is null. Copies can only access existing regions, they cannot create
+     * new ones. This is for thread safety and because MmappedRegions is
+     * reference counted, only the original state will be cleaned-up,
+     * therefore only the original state should create new mapped regions.
+     */
+    private volatile State copy;
+
+    private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length)
+    {
+        this(new State(channel), metadata, length);
+    }
+
+    private MmappedRegions(State state, CompressionMetadata metadata, long length)
+    {
+        super(new Tidier(state));
+
+        this.state = state;
+
+        if (metadata != null)
+        {
+            assert length == 0 : "expected no length with metadata";
+            updateState(metadata);
+        }
+        else if (length > 0)
+        {
+            updateState(length);
+        }
+
+        this.copy = new State(state);
+    }
+
+    private MmappedRegions(MmappedRegions original)
+    {
+        super(original);
+        this.state = original.copy;
+    }
+
+    public static MmappedRegions empty(ChannelProxy channel)
+    {
+        return new MmappedRegions(channel, null, 0);
+    }
+
+    public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata)
+    {
+        if (metadata == null)
+            throw new IllegalArgumentException("metadata cannot be null");
+
+        return new MmappedRegions(channel, metadata, 0);
+    }
+
+    public static MmappedRegions map(ChannelProxy channel, long length)
+    {
+        if (length <= 0)
+            throw new IllegalArgumentException("Length must be positive");
+
+        return new MmappedRegions(channel, null, length);
+    }
+
+    /**
+     * @return a snapshot of the memory mapped regions. The snapshot can
+     * only use existing regions, it cannot create new ones.
+     */
+    public MmappedRegions sharedCopy()
+    {
+        return new MmappedRegions(this);
+    }
+
+    private boolean isCopy()
+    {
+        return copy == null;
+    }
+
+    public void extend(long length)
+    {
+        if (length < 0)
+            throw new IllegalArgumentException("Length must not be negative");
+
+        assert !isCopy() : "Copies cannot be extended";
+
+        if (length <= state.length)
+            return;
+
+        updateState(length);
+        copy = new State(state);
+    }
+
+    private void updateState(long length)
+    {
+        state.length = length;
+        long pos = state.getPosition();
+        while (pos < length)
+        {
+            long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
+            state.add(pos, size);
+            pos += size;
+        }
+    }
+
+    private void updateState(CompressionMetadata metadata)
+    {
+        long offset = 0;
+        long lastSegmentOffset = 0;
+        long segmentSize = 0;
+
+        while (offset < metadata.dataLength)
+        {
+            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+            //Reached a new mmap boundary
+            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+            {
+                if (segmentSize > 0)
+                {
+                    state.add(lastSegmentOffset, segmentSize);
+                    lastSegmentOffset += segmentSize;
+                    segmentSize = 0;
+                }
+            }
+
+            segmentSize += chunk.length + 4; //checksum
+            offset += metadata.chunkLength();
+        }
+
+        if (segmentSize > 0)
+            state.add(lastSegmentOffset, segmentSize);
+
+        state.length = lastSegmentOffset + segmentSize;
+    }
+
+    public boolean isValid(ChannelProxy channel)
+    {
+        return state.isValid(channel);
+    }
+
+    public boolean isEmpty()
+    {
+        return state.isEmpty();
+    }
+
+    public Region floor(long position)
+    {
+        assert !isCleanedUp() : "Attempted to use closed region";
+        return state.floor(position);
+    }
+
+    public static final class Region
+    {
+        public final long offset;
+        public final ByteBuffer buffer;
+
+        public Region(long offset, ByteBuffer buffer)
+        {
+            this.offset = offset;
+            this.buffer = buffer;
+        }
+
+        public long bottom()
+        {
+            return offset;
+        }
+
+        public long top()
+        {
+            return offset + buffer.capacity();
+        }
+    }
+
+    private static final class State
+    {
+        /** The file channel */
+        private final ChannelProxy channel;
+
+        /** An array of region buffers, synchronized with offsets */
+        private ByteBuffer[] buffers;
+
+        /** An array of region offsets, synchronized with buffers */
+        private long[] offsets;
+
+        /** The maximum file length we have mapped */
+        private long length;
+
+        /** The index to the last region added */
+        private int last;
+
+        private State(ChannelProxy channel)
+        {
+            this.channel = channel.sharedCopy();
+            this.buffers = new ByteBuffer[REGION_ALLOC_SIZE];
+            this.offsets = new long[REGION_ALLOC_SIZE];
+            this.length = 0;
+            this.last = -1;
+        }
+
+        private State(State original)
+        {
+            this.channel = original.channel;
+            this.buffers = original.buffers;
+            this.offsets = original.offsets;
+            this.length = original.length;
+            this.last = original.last;
+        }
+
+        private boolean isEmpty()
+        {
+            return last < 0;
+        }
+
+        private boolean isValid(ChannelProxy channel)
+        {
+            return this.channel.filePath().equals(channel.filePath());
+        }
+
+        private Region floor(long position)
+        {
+            assert 0 <= position && position < length : String.format("%d >= %d", position, length);
+
+            int idx = Arrays.binarySearch(offsets, 0, last +1, position);
+            assert idx != -1 : String.format("Bad position %d for regions %s, last %d in %s", position, Arrays.toString(offsets), last, channel);
+            if (idx < 0)
+                idx = -(idx + 2); // round down to entry at insertion point
+
+            return new Region(offsets[idx], buffers[idx]);
+        }
+
+        private long getPosition()
+        {
+            return last < 0 ? 0 : offsets[last] + buffers[last].capacity();
+        }
+
+        private void add(long pos, long size)
+        {
+            ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, pos, size);
+
+            ++last;
+
+            if (last == offsets.length)
+            {
+                offsets = Arrays.copyOf(offsets, offsets.length + REGION_ALLOC_SIZE);
+                buffers = Arrays.copyOf(buffers, buffers.length + REGION_ALLOC_SIZE);
+            }
+
+            offsets[last] = pos;
+            buffers[last] = buffer;
+        }
+
+        private Throwable close(Throwable accumulate)
+        {
+            accumulate = channel.close(accumulate);
+
+            /*
+             * Try forcing the unmapping of segments using undocumented unsafe sun APIs.
+             * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
+             * If this works and a thread tries to access any segment, hell will unleash on earth.
+             */
+            if (!FileUtils.isCleanerAvailable())
+                return accumulate;
+
+            return perform(accumulate, channel.filePath(), Throwables.FileOpType.READ,
+                           of(buffers)
+                           .map((buffer) ->
+                                () ->
+                                {
+                                    if (buffer != null)
+                                        FileUtils.clean(buffer);
+                                }));
+        }
+    }
+
+    public static final class Tidier implements RefCounted.Tidy
+    {
+        final State state;
+
+        Tidier(State state)
+        {
+            this.state = state;
+        }
+
+        public String name()
+        {
+            return state.channel.filePath();
+        }
+
+        public void tidy()
+        {
+            try
+            {
+                Throwables.maybeFail(state.close(null));
+            }
+            catch (Exception e)
+            {
+                throw new FSReadError(e, state.channel.filePath());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 879ca6f..5f56ff6 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -18,41 +18,31 @@
 package org.apache.cassandra.io.util;
 
 import java.io.*;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class MmappedSegmentedFile extends SegmentedFile
 {
     private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
 
-    // in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
-    public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+    private final MmappedRegions regions;
 
-    /**
-     * Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
-     * segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
-     * to a RandomAccessFile.
-     */
-    private final Segment[] segments;
-
-    public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, Segment[] segments)
+    public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, MmappedRegions regions)
     {
-        super(new Cleanup(channel, segments), channel, bufferSize, length);
-        this.segments = segments;
+        super(new Cleanup(channel, regions), channel, bufferSize, length);
+        this.regions = regions;
     }
 
     private MmappedSegmentedFile(MmappedSegmentedFile copy)
     {
         super(copy);
-        this.segments = copy.segments;
+        this.regions = copy.regions;
     }
 
     public MmappedSegmentedFile sharedCopy()
@@ -60,78 +50,46 @@ public class MmappedSegmentedFile extends SegmentedFile
         return new MmappedSegmentedFile(this);
     }
 
-    /**
-     * @return The segment entry for the given position.
-     */
-    private Segment floor(long position)
+    public RandomAccessReader createReader()
     {
-        assert 0 <= position && position < length: String.format("%d >= %d in %s", position, length, path());
-        Segment seg = new Segment(position, null);
-        int idx = Arrays.binarySearch(segments, seg);
-        assert idx != -1 : String.format("Bad position %d for segments %s in %s", position, Arrays.toString(segments), path());
-        if (idx < 0)
-            // round down to entry at insertion point
-            idx = -(idx + 2);
-        return segments[idx];
+        return new RandomAccessReader.Builder(channel)
+               .overrideLength(length)
+               .regions(regions)
+               .build();
     }
 
-    /**
-     * @return The segment containing the given position: must be closed after use.
-     */
-    public FileDataInput getSegment(long position)
+    public RandomAccessReader createReader(RateLimiter limiter)
     {
-        Segment segment = floor(position);
-        if (segment.right != null)
-        {
-            // segment is mmap'd
-            return new ByteBufferDataInput(segment.right, path(), segment.left, (int) (position - segment.left));
-        }
-
-        // we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment;
-        // in this case we open as a normal random access reader
-        // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
-        RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L);
-        file.seek(position);
-        return file;
+        return new RandomAccessReader.Builder(channel)
+               .overrideLength(length)
+               .bufferSize(bufferSize)
+               .regions(regions)
+               .limiter(limiter)
+               .build();
     }
 
     private static final class Cleanup extends SegmentedFile.Cleanup
     {
-        final Segment[] segments;
-        protected Cleanup(ChannelProxy channel, Segment[] segments)
+        private final MmappedRegions regions;
+
+        Cleanup(ChannelProxy channel, MmappedRegions regions)
         {
             super(channel);
-            this.segments = segments;
+            this.regions = regions;
         }
 
         public void tidy()
         {
-            super.tidy();
-
-            if (!FileUtils.isCleanerAvailable())
-                return;
-
-        /*
-         * Try forcing the unmapping of segments using undocumented unsafe sun APIs.
-         * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
-         * If this works and a thread tries to access any segment, hell will unleash on earth.
-         */
-            try
-            {
-                for (Segment segment : segments)
-                {
-                    if (segment.right == null)
-                        continue;
-                    FileUtils.clean(segment.right);
-                }
-                logger.debug("All segments have been unmapped successfully");
-            }
-            catch (Exception e)
+            Throwable err = regions.close(null);
+            if (err != null)
             {
-                JVMStabilityInspector.inspectThrowable(e);
+                JVMStabilityInspector.inspectThrowable(err);
+
                 // This is not supposed to happen
-                logger.error("Error while unmapping segments", e);
+                logger.error("Error while closing mmapped regions", err);
             }
+
+            super.tidy();
         }
     }
 
@@ -140,104 +98,64 @@ public class MmappedSegmentedFile extends SegmentedFile
      */
     static class Builder extends SegmentedFile.Builder
     {
-        // planned segment boundaries
-        private List<Long> boundaries;
-
-        // offset of the open segment (first segment begins at 0).
-        private long currentStart = 0;
-
-        // current length of the open segment.
-        // used to allow merging multiple too-large-to-mmap segments, into a single buffered segment.
-        private long currentSize = 0;
+        private MmappedRegions regions;
 
-        public Builder()
+        Builder()
         {
             super();
-            boundaries = new ArrayList<>();
-            boundaries.add(0L);
-        }
-
-        public void addPotentialBoundary(long boundary)
-        {
-            if (boundary - currentStart <= MAX_SEGMENT_SIZE)
-            {
-                // boundary fits into current segment: expand it
-                currentSize = boundary - currentStart;
-                return;
-            }
-
-            // close the current segment to try and make room for the boundary
-            if (currentSize > 0)
-            {
-                currentStart += currentSize;
-                boundaries.add(currentStart);
-            }
-            currentSize = boundary - currentStart;
-
-            // if we couldn't make room, the boundary needs its own segment
-            if (currentSize > MAX_SEGMENT_SIZE)
-            {
-                currentStart = boundary;
-                boundaries.add(currentStart);
-                currentSize = 0;
-            }
         }
 
         public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
         {
             long length = overrideLength > 0 ? overrideLength : channel.size();
-            // create the segments
-            return new MmappedSegmentedFile(channel, bufferSize, length, createSegments(channel, length));
+            updateRegions(channel, length);
+
+            return new MmappedSegmentedFile(channel, bufferSize, length, regions.sharedCopy());
         }
 
-        private Segment[] createSegments(ChannelProxy channel, long length)
+        private void updateRegions(ChannelProxy channel, long length)
         {
-            // if we're early finishing a range that doesn't span multiple segments, but the finished file now does,
-            // we remove these from the end (we loop incase somehow this spans multiple segments, but that would
-            // be a loco dataset
-            while (length < boundaries.get(boundaries.size() - 1))
-                boundaries.remove(boundaries.size() -1);
-
-            // add a sentinel value == length
-            List<Long> boundaries = new ArrayList<>(this.boundaries);
-            if (length != boundaries.get(boundaries.size() - 1))
-                boundaries.add(length);
-
-            int segcount = boundaries.size() - 1;
-            Segment[] segments = new Segment[segcount];
-            for (int i = 0; i < segcount; i++)
+            if (regions != null && !regions.isValid(channel))
             {
-                long start = boundaries.get(i);
-                long size = boundaries.get(i + 1) - start;
-                MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
-                                           ? channel.map(FileChannel.MapMode.READ_ONLY, start, size)
-                                           : null;
-                segments[i] = new Segment(start, segment);
+                Throwable err = regions.close(null);
+                if (err != null)
+                    logger.error("Failed to close mapped regions", err);
+
+                regions = null;
             }
-            return segments;
+
+            if (regions == null)
+                regions = MmappedRegions.map(channel, length);
+            else
+                regions.extend(length);
         }
 
         @Override
-        public void serializeBounds(DataOutput out) throws IOException
+        public void serializeBounds(DataOutput out, Version version) throws IOException
         {
-            super.serializeBounds(out);
-            out.writeInt(boundaries.size());
-            for (long position: boundaries)
-                out.writeLong(position);
+            if (!version.hasBoundaries())
+                return;
+
+            super.serializeBounds(out, version);
+            out.writeInt(0);
         }
 
         @Override
-        public void deserializeBounds(DataInput in) throws IOException
+        public void deserializeBounds(DataInput in, Version version) throws IOException
         {
-            super.deserializeBounds(in);
+            if (!version.hasBoundaries())
+                return;
 
-            int size = in.readInt();
-            List<Long> temp = new ArrayList<>(size);
-            
-            for (int i = 0; i < size; i++)
-                temp.add(in.readLong());
+            super.deserializeBounds(in, version);
+            in.skipBytes(in.readInt() * TypeSizes.sizeof(0L));
+        }
 
-            boundaries = temp;
+        @Override
+        public Throwable close(Throwable accumulate)
+        {
+            return super.close(regions == null
+                               ? accumulate
+                               : regions.close(accumulate));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
index f6c939a..e599a69 100644
--- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -18,17 +18,11 @@
 package org.apache.cassandra.io.util;
 
 import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 
-import org.apache.cassandra.utils.vint.VIntCoding;
-
 import com.google.common.base.Preconditions;
 
 /**
@@ -43,357 +37,59 @@ import com.google.common.base.Preconditions;
  *
  * NIODataInputStream is not thread safe.
  */
-public class NIODataInputStream extends InputStream implements DataInputPlus, Closeable
+public class NIODataInputStream extends RebufferingInputStream
 {
-    private final ReadableByteChannel rbc;
-    private final ByteBuffer buf;
-
-    /*
-     *  Used when wrapping a fixed buffer of data instead of a channel. Should never attempt
-     *  to read from it.
-     */
-    private static final ReadableByteChannel emptyReadableByteChannel = new ReadableByteChannel()
-    {
-
-        @Override
-        public boolean isOpen()
-        {
-            return true;
-        }
-
-        @Override
-        public void close() throws IOException
-        {
-        }
-
-        @Override
-        public int read(ByteBuffer dst) throws IOException
-        {
-            throw new AssertionError();
-        }
-
-    };
-
-    public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
-    {
-        Preconditions.checkNotNull(rbc);
-        Preconditions.checkArgument(bufferSize >= 9, "Buffer size must be large enough to accomadate a varint");
-        this.rbc = rbc;
-        buf = ByteBuffer.allocateDirect(bufferSize);
-        buf.position(0);
-        buf.limit(0);
-    }
-
-    protected NIODataInputStream(ByteBuffer buf, boolean duplicate)
-    {
-        Preconditions.checkNotNull(buf);
-        if (duplicate)
-            this.buf = buf.duplicate();
-        else
-            this.buf = buf;
-
-        this.rbc = emptyReadableByteChannel;
-    }
-
-    /*
-     * The decision to duplicate or not really needs to conscious since it a real impact
-     * in terms of thread safety so don't expose this constructor with an implicit default.
-     */
-    protected NIODataInputStream(ByteBuffer buf)
-    {
-        this(buf, false);
-    }
-
-    @Override
-    public void readFully(byte[] b) throws IOException
-    {
-        readFully(b, 0, b.length);
-    }
-
-
-    @Override
-    public void readFully(byte[] b, int off, int len) throws IOException
-    {
-        int copied = 0;
-        while (copied < len)
-        {
-            int read = read(b, off + copied, len - copied);
-            if (read < 0)
-                throw new EOFException();
-            copied += read;
-        }
-    }
-
-    @Override
-    public int read(byte b[], int off, int len) throws IOException {
-        if (b == null)
-            throw new NullPointerException();
-
-        // avoid int overflow
-        if (off < 0 || off > b.length || len < 0
-                || len > b.length - off)
-            throw new IndexOutOfBoundsException();
-
-        if (len == 0)
-            return 0;
-
-        int copied = 0;
-        while (copied < len)
-        {
-            if (buf.hasRemaining())
-            {
-                int toCopy = Math.min(len - copied, buf.remaining());
-                buf.get(b, off + copied, toCopy);
-                copied += toCopy;
-            }
-            else
-            {
-                int read = readNext();
-                if (read < 0 && copied == 0) return -1;
-                if (read <= 0) return copied;
-            }
-        }
-
-        return copied;
-    }
-
-    /*
-     * Refill the buffer, preserving any unread bytes remaining in the buffer
-     */
-    protected int readNext() throws IOException
-    {
-        Preconditions.checkState(buf.remaining() != buf.capacity());
-        assert(buf.remaining() < 9);
-
-        /*
-         * If there is data already at the start of the buffer, move the position to the end
-         * If there is data but not at the start, move it to the start
-         * Otherwise move the position to 0 so writes start at the beginning of the buffer
-         *
-         * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained
-         * while retrieving a multi-byte value while the position is in the middle.
-         */
-        if (buf.position() == 0 && buf.hasRemaining())
-        {
-            buf.position(buf.limit());
-        }
-        else if (buf.hasRemaining())
-        {
-            //FastByteOperations.copy failed to do the copy so inline a simple one here
-            int position = buf.position();
-            int remaining  = buf.remaining();
-            buf.clear();
-            for (int ii = 0; ii < remaining; ii++)
-                buf.put(buf.get(position + ii));
-        }
-        else
-        {
-            buf.position(0);
-        }
-
-        buf.limit(buf.capacity());
-
-        int read = 0;
-        while ((read = rbc.read(buf)) == 0) {}
-
-        buf.flip();
-
-        return read;
-    }
-
-    /*
-     * Read the minimum number of bytes and throw EOF if the minimum could not be read
-     */
-    private void readMinimum(int minimum) throws IOException
-    {
-        assert(buf.remaining() < 8);
-        while (buf.remaining() < minimum)
-        {
-            int read = readNext();
-            if (read == -1)
-            {
-                //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here
-                buf.position(0);
-                buf.limit(0);
-                throw new EOFException();
-            }
-        }
-    }
-
-    /*
-     * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available
-     */
-    private void prepareReadPrimitive(int minimum) throws IOException
-    {
-        if (buf.remaining() < minimum)
-            readMinimum(minimum);
-    }
-
-    @Override
-    public int skipBytes(int n) throws IOException
-    {
-        int skipped = 0;
-
-        while (skipped < n)
-        {
-            int skippedThisTime = (int)skip(n - skipped);
-            if (skippedThisTime <= 0) break;
-            skipped += skippedThisTime;
-        }
-
-        return skipped;
-    }
-
-    @Override
-    public boolean readBoolean() throws IOException
-    {
-        prepareReadPrimitive(1);
-        return buf.get() != 0;
-    }
-
-    @Override
-    public byte readByte() throws IOException
-    {
-        prepareReadPrimitive(1);
-        return buf.get();
-    }
-
-    @Override
-    public int readUnsignedByte() throws IOException
-    {
-        prepareReadPrimitive(1);
-        return buf.get() & 0xff;
-    }
-
-    @Override
-    public short readShort() throws IOException
-    {
-        prepareReadPrimitive(2);
-        return buf.getShort();
-    }
-
-    @Override
-    public int readUnsignedShort() throws IOException
-    {
-        return readShort() & 0xFFFF;
-    }
-
-    @Override
-    public char readChar() throws IOException
-    {
-        prepareReadPrimitive(2);
-        return buf.getChar();
-    }
-
-    @Override
-    public int readInt() throws IOException
-    {
-        prepareReadPrimitive(4);
-        return buf.getInt();
-    }
+    protected final ReadableByteChannel channel;
 
-    @Override
-    public long readLong() throws IOException
+    private static ByteBuffer makeBuffer(int bufferSize)
     {
-        prepareReadPrimitive(8);
-        return buf.getLong();
-    }
+        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        buffer.position(0);
+        buffer.limit(0);
 
-    public long readVInt() throws IOException
-    {
-        return VIntCoding.decodeZigZag64(readUnsignedVInt());
+        return buffer;
     }
 
-    public long readUnsignedVInt() throws IOException
+    public NIODataInputStream(ReadableByteChannel channel, ByteBuffer buffer)
     {
-        //If 9 bytes aren't available use the slow path in VIntCoding
-        if (buf.remaining() < 9)
-            return VIntCoding.readUnsignedVInt(this);
+        super(buffer);
 
-        byte firstByte = buf.get();
-
-        //Bail out early if this is one byte, necessary or it fails later
-        if (firstByte >= 0)
-            return firstByte;
-
-        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
-
-        int position = buf.position();
-        int extraBits = extraBytes * 8;
-
-        long retval = buf.getLong(position);
-        if (buf.order() == ByteOrder.LITTLE_ENDIAN)
-            retval = Long.reverseBytes(retval);
-        buf.position(position + extraBytes);
-
-        // truncate the bytes we read in excess of those we needed
-        retval >>>= 64 - extraBits;
-        // remove the non-value bits from the first byte
-        firstByte &= VIntCoding.firstByteValueMask(extraBytes);
-        // shift the first byte up to its correct position
-        retval |= (long) firstByte << extraBits;
-        return retval;
+        Preconditions.checkNotNull(channel);
+        this.channel = channel;
     }
 
-    @Override
-    public float readFloat() throws IOException
+    public NIODataInputStream(ReadableByteChannel channel, int bufferSize)
     {
-        prepareReadPrimitive(4);
-        return buf.getFloat();
+        this(channel, makeBuffer(bufferSize));
     }
 
     @Override
-    public double readDouble() throws IOException
+    protected void reBuffer() throws IOException
     {
-        prepareReadPrimitive(8);
-        return buf.getDouble();
-    }
+        Preconditions.checkState(buffer.remaining() == 0);
+        buffer.clear();
 
-    @Override
-    public String readLine() throws IOException
-    {
-        throw new UnsupportedOperationException();
-    }
+        while ((channel.read(buffer)) == 0) {}
 
-    @Override
-    public String readUTF() throws IOException
-    {
-        return DataInputStream.readUTF(this);
+        buffer.flip();
     }
 
     @Override
     public void close() throws IOException
     {
-            rbc.close();
-    }
-
-    @Override
-    public int read() throws IOException
-    {
-        return readUnsignedByte();
+        channel.close();
+        super.close();
     }
 
     @Override
     public int available() throws IOException
     {
-        if (rbc instanceof SeekableByteChannel)
+        if (channel instanceof SeekableByteChannel)
         {
-            SeekableByteChannel sbc = (SeekableByteChannel)rbc;
+            SeekableByteChannel sbc = (SeekableByteChannel) channel;
             long remainder = Math.max(0, sbc.size() - sbc.position());
-            return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining());
+            return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buffer.remaining());
         }
-        return buf.remaining();
-    }
-
-    @Override
-    public void reset() throws IOException
-    {
-        throw new IOException("mark/reset not supported");
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return false;
+        return buffer.remaining();
     }
 }


Mime
View raw message