cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: Move sstableRandomAccessReader to nio2 patch by Josh McKenzie; reviewed by Benedict Elliott Smith for CASSANDRA-4050
Date Mon, 07 Apr 2014 20:51:52 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 5dfe24124 -> c18ce589e


Move sstableRandomAccessReader to nio2
patch by Josh McKenzie; reviewed by Benedict Elliott Smith for CASSANDRA-4050


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c18ce589
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c18ce589
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c18ce589

Branch: refs/heads/trunk
Commit: c18ce589efdf480ad4623298ffb7038eb4091afb
Parents: 5dfe241
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Apr 7 15:51:35 2014 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Apr 7 15:51:46 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../compress/CompressedRandomAccessReader.java  | 104 ++++++-----
 .../io/compress/CompressedThrottledReader.java  |   2 +-
 .../cassandra/io/util/AbstractDataInput.java    |  24 ++-
 .../cassandra/io/util/MappedFileDataInput.java  |  34 ++--
 .../cassandra/io/util/MemoryInputStream.java    |  19 +-
 .../cassandra/io/util/RandomAccessReader.java   | 177 +++++++++----------
 .../cassandra/io/util/ThrottledReader.java      |   2 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |   9 +
 .../utils/vint/EncodedDataInputStream.java      |  18 +-
 .../io/util/BufferedRandomAccessFileTest.java   |  38 ----
 11 files changed, 208 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64a53b8..8338f1b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Move sstable RandomAccessReader to nio2, which allows using the
+   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
  * Remove CQL2 (CASSANDRA-5918)
  * Add Thrift get_multi_slice call (CASSANDRA-6757)
  * Optimize fetching multiple cells by name (CASSANDRA-6933)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 131a4d6..d71964c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -79,70 +79,80 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
     }
 
+    protected ByteBuffer allocateBuffer(int bufferSize)
+    {
+        assert Integer.bitCount(bufferSize) == 1;
+        return ByteBuffer.allocate(bufferSize);
+    }
+
     @Override
     protected void reBuffer()
     {
         try
         {
-            decompressChunk(metadata.chunkFor(current));
-        }
-        catch (CorruptBlockException e)
-        {
-            throw new CorruptSSTableException(e, getPath());
-        }
-        catch (IOException e)
-        {
-            throw new FSReadError(e, getPath());
-        }
-    }
-
-    private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException
-    {
-        if (channel.position() != chunk.offset)
-            channel.position(chunk.offset);
+            long position = current();
+            assert position < metadata.dataLength;
 
-        if (compressed.capacity() < chunk.length)
-            compressed = ByteBuffer.wrap(new byte[chunk.length]);
-        else
-            compressed.clear();
-        compressed.limit(chunk.length);
+            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 
-        if (channel.read(compressed) != chunk.length)
-            throw new CorruptBlockException(getPath(), chunk);
+            if (channel.position() != chunk.offset)
+                channel.position(chunk.offset);
 
-        // technically flip() is unnecessary since all the remaining work uses the raw array,
but if that changes
-        // in the future this will save a lot of hair-pulling
-        compressed.flip();
-        try
-        {
-            validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length,
buffer, 0);
-        }
-        catch (IOException e)
-        {
-            throw new CorruptBlockException(getPath(), chunk);
-        }
+            if (compressed.capacity() < chunk.length)
+                compressed = ByteBuffer.wrap(new byte[chunk.length]);
+            else
+                compressed.clear();
+            compressed.limit(chunk.length);
 
-        if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
-        {
+            if (channel.read(compressed) != chunk.length)
+                throw new CorruptBlockException(getPath(), chunk);
 
-            if (metadata.hasPostCompressionAdlerChecksums)
+            // technically flip() is unnecessary since all the remaining work uses the raw
array, but if that changes
+            // in the future this will save a lot of hair-pulling
+            compressed.flip();
+            buffer.clear();
+            int decompressedBytes;
+            try
             {
-                checksum.update(compressed.array(), 0, chunk.length);
+                decompressedBytes = metadata.compressor().uncompress(compressed.array(),
0, chunk.length, buffer.array(), 0);
+                buffer.limit(decompressedBytes);
             }
-            else
+            catch (IOException e)
             {
-                checksum.update(buffer, 0, validBufferBytes);
+                throw new CorruptBlockException(getPath(), chunk);
             }
 
-            if (checksum(chunk) != (int) checksum.getValue())
-                throw new CorruptBlockException(getPath(), chunk);
+            if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
+            {
 
-            // reset checksum object back to the original (blank) state
-            checksum.reset();
-        }
+                if (metadata.hasPostCompressionAdlerChecksums)
+                {
+                    checksum.update(compressed.array(), 0, chunk.length);
+                }
+                else
+                {
+                    checksum.update(buffer.array(), 0, decompressedBytes);
+                }
 
-        // buffer offset is always aligned
-        bufferOffset = current & ~(buffer.length - 1);
+                if (checksum(chunk) != (int) checksum.getValue())
+                    throw new CorruptBlockException(getPath(), chunk);
+
+                // reset checksum object back to the original (blank) state
+                checksum.reset();
+            }
+
+            // buffer offset is always aligned
+            bufferOffset = position & ~(buffer.capacity() - 1);
+            buffer.position((int) (position - bufferOffset));
+        }
+        catch (CorruptBlockException e)
+        {
+            throw new CorruptSSTableException(e, getPath());
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
     }
 
     private int checksum(CompressionMetadata.Chunk chunk) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
index c5ae795..2495d17 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -37,7 +37,7 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
 
     protected void reBuffer()
     {
-        limiter.acquire(buffer.length);
+        limiter.acquire(buffer.capacity());
         super.reBuffer();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index ff8b6b2..2815260 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -21,12 +21,20 @@ import java.io.*;
 
 public abstract class AbstractDataInput extends InputStream implements DataInput
 {
-    protected abstract void seekInternal(int position);
-    protected abstract int getPosition();
+    protected abstract void seek(long position) throws IOException;
+    protected abstract long getPosition();
+    protected abstract long getPositionLimit();
 
-    /*
-     !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
-     */
+    public int skipBytes(int n) throws IOException
+    {
+        if (n <= 0)
+            return 0;
+        long oldPosition = getPosition();
+        seek(Math.min(getPositionLimit(), oldPosition + n));
+        long skipped = getPosition() - oldPosition;
+        assert skipped >= 0 && skipped <= n;
+        return (int) skipped;
+    }
 
     /**
      * Reads a boolean from the current position in this file. Blocks until one
@@ -214,7 +222,7 @@ public abstract class AbstractDataInput extends InputStream implements
DataInput
     public final String readLine() throws IOException {
         StringBuilder line = new StringBuilder(80); // Typical line length
         boolean foundTerminator = false;
-        int unreadPosition = 0;
+        long unreadPosition = -1;
         while (true) {
             int nextByte = read();
             switch (nextByte) {
@@ -222,7 +230,7 @@ public abstract class AbstractDataInput extends InputStream implements
DataInput
                     return line.length() != 0 ? line.toString() : null;
                 case (byte) '\r':
                     if (foundTerminator) {
-                        seekInternal(unreadPosition);
+                        seek(unreadPosition);
                         return line.toString();
                     }
                     foundTerminator = true;
@@ -233,7 +241,7 @@ public abstract class AbstractDataInput extends InputStream implements
DataInput
                     return line.toString();
                 default:
                     if (foundTerminator) {
-                        seekInternal(unreadPosition);
+                        seek(unreadPosition);
                         return line.toString();
                     }
                     line.append((char) nextByte);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index f397ddc..0479256 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -24,7 +24,7 @@ import java.nio.channels.FileChannel;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class MappedFileDataInput extends AbstractDataInput implements FileDataInput
+public class MappedFileDataInput extends AbstractDataInput implements FileDataInput, DataInput
 {
     private final MappedByteBuffer buffer;
     private final String filename;
@@ -49,12 +49,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
         this.position = position;
     }
 
-    // don't make this public, this is only for seeking WITHIN the current mapped segment
-    protected void seekInternal(int pos)
-    {
-        position = pos;
-    }
-
     // Only use when we know the seek in within the mapped segment. Throws an
     // IOException otherwise.
     public void seek(long pos) throws IOException
@@ -63,17 +57,22 @@ public class MappedFileDataInput extends AbstractDataInput implements
FileDataIn
         if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
             throw new IOException(String.format("Seek position %d is not within mmap segment
(seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
 
-        seekInternal((int) inSegmentPos);
+        position = (int) inSegmentPos;
     }
 
     public long getFilePointer()
     {
-        return segmentOffset + (long)position;
+        return segmentOffset + position;
     }
 
-    protected int getPosition()
+    protected long getPosition()
     {
-        return position;
+        return segmentOffset + position;
+    }
+
+    protected long getPositionLimit()
+    {
+        return segmentOffset + buffer.capacity();
     }
 
     @Override
@@ -85,7 +84,7 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
     public void reset(FileMark mark) throws IOException
     {
         assert mark instanceof MappedFileDataInputMark;
-        seekInternal(((MappedFileDataInputMark) mark).position);
+        position = ((MappedFileDataInputMark) mark).position;
     }
 
     public FileMark mark()
@@ -162,17 +161,6 @@ public class MappedFileDataInput extends AbstractDataInput implements
FileDataIn
         throw new UnsupportedOperationException("use readBytes instead");
     }
 
-    public int skipBytes(int n) throws IOException
-    {
-        assert n >= 0 : "skipping negative bytes is illegal: " + n;
-        if (n == 0)
-            return 0;
-        int oldPosition = position;
-        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
-        position = Math.min(buffer.capacity(), position + n);
-        return position - oldPosition;
-    }
-
     private static class MappedFileDataInputMark implements FileMark
     {
         int position;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
index eee030a..73ccc1b 100644
--- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
@@ -17,9 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.DataInput;
 import java.io.IOException;
 
-public class MemoryInputStream extends AbstractDataInput
+public class MemoryInputStream extends AbstractDataInput implements DataInput
 {
     private final Memory mem;
     private int position = 0;
@@ -40,20 +41,24 @@ public class MemoryInputStream extends AbstractDataInput
         position += count;
     }
 
-    protected void seekInternal(int pos)
+    protected void seek(long pos)
     {
-        position = pos;
+        position = (int) pos;
     }
 
-    protected int getPosition()
+    protected long getPosition()
     {
         return position;
     }
 
-    public int skipBytes(int n) throws IOException
+    protected long getPositionLimit()
     {
-        seekInternal(getPosition() + n);
-        return position;
+        return mem.size();
+    }
+
+    protected long length()
+    {
+        return mem.size();
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 8347cd9..e395510 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.io.util;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class RandomAccessReader extends RandomAccessFile implements FileDataInput
+public class RandomAccessReader extends AbstractDataInput implements FileDataInput
 {
     public static final long CACHE_FLUSH_INTERVAL_IN_BYTES = (long) Math.pow(2, 27); // 128mb
 
@@ -36,17 +38,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     private final String filePath;
 
     // buffer which will cache file blocks
-    protected byte[] buffer;
+    protected ByteBuffer buffer;
 
-    // `current` as current position in file
     // `bufferOffset` is the offset of the beginning of the buffer
     // `markedPointer` folds the offset of the last file mark
-    protected long bufferOffset, current = 0, markedPointer;
-    // `validBufferBytes` is the number of bytes in the buffer that are actually valid;
-    //  this will be LESS than buffer capacity if buffer is not full!
-    protected int validBufferBytes = 0;
+    protected long bufferOffset, markedPointer;
 
-    // channel liked with the file, used to retrieve data and force updates.
+    // channel linked with the file, used to retrieve data and force updates.
     protected final FileChannel channel;
 
     private final long fileLength;
@@ -55,19 +53,23 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
     protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws
FileNotFoundException
     {
-        super(file, "r");
-
         this.owner = owner;
 
-        channel = super.getChannel();
         filePath = file.getAbsolutePath();
 
+        try
+        {
+            channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+        }
+        catch (IOException e)
+        {
+            throw new FileNotFoundException(filePath);
+        }
+
         // allocating required size of the buffer
         if (bufferSize <= 0)
             throw new IllegalArgumentException("bufferSize must be positive");
 
-        buffer = new byte[bufferSize];
-
         // we can cache file length in read-only mode
         try
         {
@@ -77,7 +79,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         {
             throw new FSReadError(e, filePath);
         }
-        validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
+        buffer = allocateBuffer(bufferSize);
+        buffer.limit(0);
+    }
+
+    protected ByteBuffer allocateBuffer(int bufferSize)
+    {
+        return ByteBuffer.allocate((int) Math.min(fileLength, bufferSize));
     }
 
     public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
@@ -97,7 +105,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         {
             return new RandomAccessReader(file, bufferSize, owner);
         }
-        catch (FileNotFoundException e)
+        catch (IOException e)
         {
             throw new RuntimeException(e);
         }
@@ -109,31 +117,31 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
         return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null);
     }
 
+    // channel extends FileChannel, impl SeekableByteChannel.  Safe to cast.
+    public FileChannel getChannel()
+    {
+        return channel;
+    }
+
     /**
      * Read data from file starting from current currentOffset to populate buffer.
      */
     protected void reBuffer()
     {
-        resetBuffer();
+        bufferOffset += buffer.position();
+        buffer.clear();
+        assert bufferOffset < fileLength;
 
         try
         {
-            if (bufferOffset >= channel.size())
-                return;
-
             channel.position(bufferOffset); // setting channel position
-
-            int read = 0;
-
-            while (read < buffer.length)
+            while (buffer.hasRemaining())
             {
-                int n = super.read(buffer, read, buffer.length - read);
+                int n = channel.read(buffer);
                 if (n < 0)
                     break;
-                read += n;
             }
-
-            validBufferBytes = read;
+            buffer.flip();
         }
         catch (IOException e)
         {
@@ -144,7 +152,12 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     @Override
     public long getFilePointer()
     {
-        return current;
+        return current();
+    }
+
+    protected long current()
+    {
+        return bufferOffset + (buffer == null ? 0 : buffer.position());
     }
 
     public String getPath()
@@ -154,7 +167,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
     public int getTotalBufferSize()
     {
-        return buffer.length;
+        return buffer.capacity();
     }
 
     public void reset()
@@ -164,14 +177,14 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
 
     public long bytesPastMark()
     {
-        long bytes = current - markedPointer;
+        long bytes = current() - markedPointer;
         assert bytes >= 0;
         return bytes;
     }
 
     public FileMark mark()
     {
-        markedPointer = current;
+        markedPointer = current();
         return new BufferedRandomAccessFileMark(markedPointer);
     }
 
@@ -184,7 +197,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     public long bytesPastMark(FileMark mark)
     {
         assert mark instanceof BufferedRandomAccessFileMark;
-        long bytes = current - ((BufferedRandomAccessFileMark) mark).pointer;
+        long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer;
         assert bytes >= 0;
         return bytes;
     }
@@ -202,17 +215,6 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return length() - getFilePointer();
     }
 
-    protected int bufferCursor()
-    {
-        return (int) (current - bufferOffset);
-    }
-
-    protected void resetBuffer()
-    {
-        bufferOffset = current;
-        validBufferBytes = 0;
-    }
-
     @Override
     public void close()
     {
@@ -233,11 +235,12 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
 
     public void deallocate()
     {
+        bufferOffset += buffer.position();
         buffer = null; // makes sure we don't use this after it's ostensibly closed
 
         try
         {
-            super.close();
+            channel.close();
         }
         catch (IOException e)
         {
@@ -270,17 +273,28 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
         if (newPosition < 0)
             throw new IllegalArgumentException("new position should not be negative");
 
-        if (newPosition > length()) // it is save to call length() in read-only mode
-            throw new IllegalArgumentException(String.format("unable to seek to position
%d in %s (%d bytes) in read-only mode",
+        if (newPosition >= length()) // it is save to call length() in read-only mode
+        {
+            if (newPosition > length())
+                throw new IllegalArgumentException(String.format("unable to seek to position
%d in %s (%d bytes) in read-only mode",
                                                              newPosition, getPath(), length()));
+            buffer.limit(0);
+            bufferOffset = newPosition;
+            return;
+        }
 
-        current = newPosition;
-
-        if (newPosition > (bufferOffset + validBufferBytes) || newPosition < bufferOffset)
-            reBuffer();
+        if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit())
+        {
+            buffer.position((int) (newPosition - bufferOffset));
+            return;
+        }
+        // Set current location to newPosition and clear buffer so reBuffer calculates from
newPosition
+        bufferOffset = newPosition;
+        buffer.clear();
+        reBuffer();
+        assert current() == newPosition;
     }
 
-    @Override
     // -1 will be returned if there is nothing to read; higher-level methods like readInt
     // or readFully (from RandomAccessFile) will throw EOFException but this should not
     public int read()
@@ -291,12 +305,10 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
         if (isEOF())
             return -1; // required by RandomAccessFile
 
-        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+        if (!buffer.hasRemaining())
             reBuffer();
 
-        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
-
-        return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
+        return (int)buffer.get() & 0xff;
     }
 
     @Override
@@ -319,47 +331,41 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
         if (isEOF())
             return -1;
 
-        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+        if (!buffer.hasRemaining())
             reBuffer();
 
-        assert current >= bufferOffset && current < bufferOffset + validBufferBytes
-                : String.format("File (%s), current offset %d, buffer offset %d, buffer limit
%d",
-                                getPath(),
-                                current,
-                                bufferOffset,
-                                validBufferBytes);
-
-        int toCopy = Math.min(length, validBufferBytes - bufferCursor());
-
-        System.arraycopy(buffer, bufferCursor(), buff, offset, toCopy);
-        current += toCopy;
-
+        int toCopy = Math.min(length, buffer.remaining());
+        buffer.get(buff, offset, toCopy);
         return toCopy;
     }
 
     public ByteBuffer readBytes(int length) throws EOFException
     {
         assert length >= 0 : "buffer length should not be negative: " + length;
-
-        byte[] buff = new byte[length];
-
         try
         {
-            readFully(buff); // reading data buffer
+            ByteBuffer result = ByteBuffer.allocate(length);
+            while (result.hasRemaining())
+            {
+                if (isEOF())
+                    throw new EOFException();
+                if (!buffer.hasRemaining())
+                    reBuffer();
+                ByteBufferUtil.put(buffer, result);
+            }
+            result.flip();
+            return result;
         }
         catch (EOFException e)
         {
             throw e;
         }
-        catch (IOException e)
+        catch (Exception e)
         {
             throw new FSReadError(e, filePath);
         }
-
-        return ByteBuffer.wrap(buff);
     }
 
-    @Override
     public long length()
     {
         return fileLength;
@@ -367,24 +373,11 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
 
     public long getPosition()
     {
-        return current;
-    }
-
-    @Override
-    public void write(int value)
-    {
-        throw new UnsupportedOperationException();
+        return bufferOffset + buffer.position();
     }
 
-    @Override
-    public void write(byte[] buffer)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void write(byte[] buffer, int offset, int length)
+    public long getPositionLimit()
     {
-        throw new UnsupportedOperationException();
+        return length();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index b12a8a8..b9b645a 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -38,7 +38,7 @@ public class ThrottledReader extends RandomAccessReader
 
     protected void reBuffer()
     {
-        limiter.acquire(buffer.length);
+        limiter.acquire(buffer.capacity());
         super.reBuffer();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index f20a46a..91aa6f7 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -273,6 +273,15 @@ public class ByteBufferUtil
         FastByteOperations.copy(src, srcPos, dst, dstPos, length);
     }
 
+    public static int put(ByteBuffer src, ByteBuffer trg)
+    {
+        int length = Math.min(src.remaining(), trg.remaining());
+        arrayCopy(src, src.position(), trg, trg.position(), length);
+        trg.position(trg.position() + length);
+        src.position(src.position() + length);
+        return length;
+    }
+
     public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) throws IOException
     {
         out.writeInt(bytes.remaining());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
index b35d180..6385e5c 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -25,10 +25,10 @@ import org.apache.cassandra.io.util.AbstractDataInput;
 /**
  * Borrows idea from
  * https://developers.google.com/protocol-buffers/docs/encoding#varints
- * 
+ *
  * Should be used with EncodedDataOutputStream
  */
-public class EncodedDataInputStream extends AbstractDataInput
+public class EncodedDataInputStream extends AbstractDataInput implements DataInput
 {
     private DataInput input;
 
@@ -47,12 +47,22 @@ public class EncodedDataInputStream extends AbstractDataInput
         return input.readByte() & 0xFF;
     }
 
-    protected void seekInternal(int position)
+    protected void seek(long position)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected long getPosition()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected long getPositionLimit()
     {
         throw new UnsupportedOperationException();
     }
 
-    protected int getPosition()
+    protected long length()
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 75de261..8053553 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -547,34 +547,6 @@ public class BufferedRandomAccessFileTest
             }
         }, IllegalArgumentException.class);
 
-        // Any write() call should fail
-        expectException(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                copy.write(1);
-                return null;
-            }
-        }, UnsupportedOperationException.class);
-
-        expectException(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                copy.write(new byte[1]);
-                return null;
-            }
-        }, UnsupportedOperationException.class);
-
-        expectException(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                copy.write(new byte[3], 0, 2);
-                return null;
-            }
-        }, UnsupportedOperationException.class);
-
         copy.seek(0);
         copy.skipBytes(5);
 
@@ -619,16 +591,6 @@ public class BufferedRandomAccessFileTest
         }
     }
 
-    @Test (expected=IOException.class)
-    public void testSetLengthDuringReadMode() throws IOException
-    {
-        File tmpFile = File.createTempFile("set_length_during_read_mode", "bin");
-        try (RandomAccessReader file = RandomAccessReader.open(tmpFile))
-        {
-            file.setLength(4L);
-        }
-    }
-
     private SequentialWriter createTempFile(String name) throws IOException
     {
         File tempFile = File.createTempFile(name, null);


Mime
View raw message