cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [06/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)
Date Fri, 04 Sep 2015 11:46:27 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index ca15722..1bdc591 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -117,7 +117,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
             }
 
             assert f.exists();
-            RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
+            RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build();
             assertEquals(dataPre.length + rawPost.length, reader.length());
             byte[] result = new byte[(int)reader.length()];
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 793348a..c7f3c36 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.MmappedRegions;
 import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.schema.CachingParams;
@@ -133,7 +134,7 @@ public class SSTableReaderTest
     @Test
     public void testSpannedIndexPositions() throws IOException
     {
-        MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+        MmappedRegions.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
 
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index b875a6a..4a72281 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -542,14 +542,12 @@ public class BufferedDataOutputStreamTest
 
         ndosp.flush();
 
-        @SuppressWarnings("resource")
-        ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
-
+        DataInputBuffer in = new DataInputBuffer(generated.toByteArray());
         assertEquals(expectedSize, generated.toByteArray().length);
 
         for (long v : testValues)
         {
-            assertEquals(v, bbdi.readVInt());
+            assertEquals(v, in.readVInt());
         }
     }
 
@@ -574,13 +572,11 @@ public class BufferedDataOutputStreamTest
 
         ndosp.flush();
 
-        @SuppressWarnings("resource")
-        ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
-
+        DataInputBuffer in = new DataInputBuffer(generated.toByteArray());
         assertEquals(expectedSize, generated.toByteArray().length);
 
         for (long v : testValues)
-            assertEquals(v, bbdi.readUnsignedVInt());
+            assertEquals(v, in.readUnsignedVInt());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index e051c00..364ea71 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -25,9 +25,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
-import java.util.concurrent.Callable;
 
 import static org.apache.cassandra.Util.expectEOF;
 import static org.apache.cassandra.Util.expectException;
@@ -96,7 +94,7 @@ public class BufferedRandomAccessFileTest
 
         // test readBytes(int) method
         r.seek(0);
-        ByteBuffer fileContent = r.readBytes((int) w.length());
+        ByteBuffer fileContent = ByteBufferUtil.read(r, (int) w.length());
         assertEquals(fileContent.limit(), w.length());
         assert ByteBufferUtil.string(fileContent).equals("Hello" + new String(bigData));
 
@@ -204,25 +202,19 @@ public class BufferedRandomAccessFileTest
         final ChannelProxy channel = new ChannelProxy(w.getPath());
         final RandomAccessReader r = RandomAccessReader.open(channel);
 
-        ByteBuffer content = r.readBytes((int) r.length());
+        ByteBuffer content = ByteBufferUtil.read(r, (int) r.length());
 
         // after reading whole file we should be at EOF
         assertEquals(0, ByteBufferUtil.compare(content, data));
         assert r.bytesRemaining() == 0 && r.isEOF();
 
         r.seek(0);
-        content = r.readBytes(10); // reading first 10 bytes
+        content = ByteBufferUtil.read(r, 10); // reading first 10 bytes
         assertEquals(ByteBufferUtil.compare(content, "cccccccccc".getBytes()), 0);
         assertEquals(r.bytesRemaining(), r.length() - content.limit());
 
         // trying to read more than file has right now
-        expectEOF(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                return r.readBytes((int) r.length() + 10);
-            }
-        });
+        expectEOF(() -> ByteBufferUtil.read(r, (int) r.length() + 10));
 
         w.finish();
         r.close();
@@ -249,23 +241,9 @@ public class BufferedRandomAccessFileTest
         assertEquals(file.bytesRemaining(), file.length() - 20);
 
         // trying to seek past the end of the file should produce EOFException
-        expectException(new Callable<Object>()
-        {
-            public Object call()
-            {
-                file.seek(file.length() + 30);
-                return null;
-            }
-        }, IllegalArgumentException.class);
+        expectException(() -> { file.seek(file.length() + 30); return null; }, IllegalArgumentException.class);
 
-        expectException(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                file.seek(-1);
-                return null;
-            }
-        }, IllegalArgumentException.class); // throws IllegalArgumentException
+        expectException(() -> { file.seek(-1); return null; }, IllegalArgumentException.class); // throws IllegalArgumentException
 
         file.close();
         channel.close();
@@ -352,16 +330,11 @@ public class BufferedRandomAccessFileTest
             {
                 File file1 = writeTemporaryFile(new byte[16]);
                 try (final ChannelProxy channel = new ChannelProxy(file1);
-                     final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L))
+                     final RandomAccessReader file = new RandomAccessReader.Builder(channel)
+                                                     .bufferSize(bufferSize)
+                                                     .build())
                 {
-                    expectEOF(new Callable<Object>()
-                    {
-                        public Object call() throws IOException
-                        {
-                            file.readFully(target, offset, 17);
-                            return null;
-                        }
-                    });
+                    expectEOF(() -> { file.readFully(target, offset, 17); return null; });
                 }
             }
 
@@ -370,15 +343,11 @@ public class BufferedRandomAccessFileTest
             {
                 File file1 = writeTemporaryFile(new byte[16]);
                 try (final ChannelProxy channel = new ChannelProxy(file1);
-                     final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L))
+                     final RandomAccessReader file = new RandomAccessReader.Builder(channel).bufferSize(bufferSize).build())
                 {
-                    expectEOF(new Callable<Object>()
-                    {
-                        public Object call() throws IOException
-                        {
-                            while (true)
-                                file.readFully(target, 0, n);
-                        }
+                    expectEOF(() -> {
+                        while (true)
+                            file.readFully(target, 0, n);
                     });
                 }
             }
@@ -459,30 +428,17 @@ public class BufferedRandomAccessFileTest
 
         r.close(); // closing to test read after close
 
-        expectException(new Callable<Object>()
-        {
-            public Object call()
-            {
-                return r.read();
-            }
-        }, AssertionError.class);
+        expectException(() -> r.read(), NullPointerException.class);
 
         //Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer
         //Writing to a BufferedOutputStream that is closed generates no error
         //Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a
         //write of a 0 length, but that is kind of a corner case
-        expectException(new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                w.write(generateByteArray(1));
-                return null;
-            }
-        }, NullPointerException.class);
+        expectException(() -> { w.write(generateByteArray(1)); return null; }, NullPointerException.class);
 
         try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
         {
-            ByteBuffer contents = copy.readBytes((int) copy.length());
+            ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
 
             assertEquals(contents.limit(), data.length);
             assertEquals(ByteBufferUtil.compare(contents, data), 0);
@@ -526,7 +482,7 @@ public class BufferedRandomAccessFileTest
         channel.close();
     }
 
-    @Test (expected = AssertionError.class)
+    @Test(expected = AssertionError.class)
     public void testAssertionErrorWhenBytesPastMarkIsNegative() throws IOException
     {
         try (SequentialWriter w = createTempFile("brafAssertionErrorWhenBytesPastMarkIsNegative"))
@@ -565,14 +521,7 @@ public class BufferedRandomAccessFileTest
         assertTrue(copy.bytesRemaining() == 0 && copy.isEOF());
 
         // can't seek past the end of the file for read-only files
-        expectException(new Callable<Object>()
-        {
-            public Object call()
-            {
-                copy.seek(copy.length() + 1);
-                return null;
-            }
-        }, IllegalArgumentException.class);
+        expectException(() -> { copy.seek(copy.length() + 1); return null; }, IllegalArgumentException.class);
 
         copy.seek(0);
         copy.skipBytes(5);
@@ -582,7 +531,7 @@ public class BufferedRandomAccessFileTest
         assertTrue(!copy.isEOF());
 
         copy.seek(0);
-        ByteBuffer contents = copy.readBytes((int) copy.length());
+        ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
 
         assertEquals(contents.limit(), copy.length());
         assertTrue(ByteBufferUtil.compare(contents, data) == 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
new file mode 100644
index 0000000..57428af
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
+import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+public class ChecksummedRandomAccessReaderTest
+{
+    @Test
+    public void readFully() throws IOException
+    {
+        final File data = File.createTempFile("testReadFully", "data");
+        final File crc = File.createTempFile("testReadFully", "crc");
+
+        final byte[] expected = new byte[70 * 1024];   // bit more than crc chunk size, so we can test rebuffering.
+        ThreadLocalRandom.current().nextBytes(expected);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(expected);
+        writer.finish();
+
+        assert data.exists();
+
+        RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+        byte[] b = new byte[expected.length];
+        reader.readFully(b);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+
+    @Test
+    public void seek() throws IOException
+    {
+        final File data = File.createTempFile("testSeek", "data");
+        final File crc = File.createTempFile("testSeek", "crc");
+
+        final byte[] dataBytes = new byte[70 * 1024];   // bit more than crc chunk size
+        ThreadLocalRandom.current().nextBytes(dataBytes);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(dataBytes);
+        writer.finish();
+
+        assert data.exists();
+
+        RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+
+        final int seekPosition = 66000;
+        reader.seek(seekPosition);
+
+        byte[] b = new byte[dataBytes.length - seekPosition];
+        reader.readFully(b);
+
+        byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+
+    @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
+    public void corruptionDetection() throws IOException
+    {
+        final File data = File.createTempFile("corruptionDetection", "data");
+        final File crc = File.createTempFile("corruptionDetection", "crc");
+
+        final byte[] expected = new byte[5 * 1024];
+        Arrays.fill(expected, (byte) 0);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(expected);
+        writer.finish();
+
+        assert data.exists();
+
+        // simulate corruption of file
+        try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
+        {
+            dataFile.seek(1024);
+            dataFile.write((byte) 5);
+        }
+
+        RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+        byte[] b = new byte[expected.length];
+        reader.readFully(b);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
new file mode 100644
index 0000000..fcee9b7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FileSegmentInputStreamTest
+{
+    private ByteBuffer allocateBuffer(int size)
+    {
+        ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size));
+        long seed = System.nanoTime();
+        //seed = 365238103404423L;
+        System.out.println("Seed " + seed);
+
+        new Random(seed).nextBytes(ret.array());
+        return ret;
+    }
+
+    @Test
+    public void testRead() throws IOException
+    {
+        testRead(0, 4096, 1024);
+        testRead(1024, 4096, 1024);
+        testRead(4096, 4096, 1024);
+    }
+
+    private void testRead(int offset, int size, int checkInterval) throws IOException
+    {
+        final ByteBuffer buffer = allocateBuffer(size);
+        final String path = buffer.toString();
+
+        FileSegmentInputStream reader = new FileSegmentInputStream(buffer.duplicate(), path, offset);
+        assertEquals(path, reader.getPath());
+
+        for (int i = offset; i < (size + offset); i += checkInterval)
+        {
+            reader.seek(i);
+            assertFalse(reader.isEOF());
+            assertEquals(i, reader.getFilePointer());
+
+            buffer.position(i - offset);
+
+            int remaining = buffer.remaining();
+            assertEquals(remaining, reader.bytesRemaining());
+            byte[] expected = new byte[buffer.remaining()];
+            buffer.get(expected);
+            assertTrue(Arrays.equals(expected, ByteBufferUtil.read(reader, remaining).array()));
+
+            assertTrue(reader.isEOF());
+            assertEquals(0, reader.bytesRemaining());
+            assertEquals(buffer.capacity() + offset, reader.getFilePointer());
+        }
+
+        reader.close();
+        reader.close();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testMarkNotSupported() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+        assertFalse(reader.markSupported());
+        assertEquals(0, reader.bytesPastMark(null));
+        reader.mark();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testResetNotSupported() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+        reader.reset(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekNegative() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+        reader.seek(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekBeforeOffset() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+        reader.seek(1023);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekPastLength() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+        reader.seek(2049);
+    }
+
+    @Test(expected = EOFException.class)
+    public void testReadBytesTooMany() throws Exception
+    {
+        FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+        ByteBufferUtil.read(reader, 2049);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MemoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/MemoryTest.java b/test/unit/org/apache/cassandra/io/util/MemoryTest.java
index 9be69ac..81dee7e 100644
--- a/test/unit/org/apache/cassandra/io/util/MemoryTest.java
+++ b/test/unit/org/apache/cassandra/io/util/MemoryTest.java
@@ -18,8 +18,11 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.io.EOFException;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.junit.Test;
@@ -27,6 +30,10 @@ import org.junit.Test;
 import junit.framework.Assert;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class MemoryTest
 {
 
@@ -45,6 +52,36 @@ public class MemoryTest
         memory.close();
     }
 
+    @Test
+    public void testInputStream() throws IOException
+    {
+        byte[] bytes = new byte[4096];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        final Memory memory = Memory.allocate(bytes.length);
+        memory.setBytes(0, bytes, 0, bytes.length);
+
+        try(MemoryInputStream stream = new MemoryInputStream(memory, 1024))
+        {
+            byte[] bb = new byte[bytes.length];
+            assertEquals(bytes.length, stream.available());
+
+            stream.readFully(bb);
+            assertEquals(0, stream.available());
+
+            assertTrue(Arrays.equals(bytes, bb));
+
+            try
+            {
+                stream.readInt();
+                fail("Expected EOF exception");
+            }
+            catch (EOFException e)
+            {
+                //pass
+            }
+        }
+    }
+
     private static void test(ByteBuffer canon, Memory memory)
     {
         ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
new file mode 100644
index 0000000..9df3fed
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
+
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MmappedRegionsTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(MmappedRegionsTest.class);
+
+    private static ByteBuffer allocateBuffer(int size)
+    {
+        ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size));
+        long seed = System.nanoTime();
+        //seed = 365238103404423L;
+        logger.info("Seed {}", seed);
+
+        new Random(seed).nextBytes(ret.array());
+        return ret;
+    }
+
+    private static File writeFile(String fileName, ByteBuffer buffer) throws IOException
+    {
+        File ret = File.createTempFile(fileName, "1");
+        ret.deleteOnExit();
+
+        try (SequentialWriter writer = SequentialWriter.open(ret))
+        {
+            writer.write(buffer);
+            writer.finish();
+        }
+
+        assert ret.exists();
+        assert ret.length() >= buffer.capacity();
+        return ret;
+
+    }
+
+    @Test
+    public void testEmpty() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(1024);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testEmpty", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            assertTrue(regions.isEmpty());
+            assertTrue(regions.isValid(channel));
+        }
+    }
+
+    @Test
+    public void testTwoSegments() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(2048);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(1024);
+            for (int i = 0; i < 1024; i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                assertEquals(0, region.bottom());
+                assertEquals(1024, region.top());
+            }
+
+            regions.extend(2048);
+            for (int i = 0; i < 2048; i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                if (i < 1024)
+                {
+                    assertEquals(0, region.bottom());
+                    assertEquals(1024, region.top());
+                }
+                else
+                {
+                    assertEquals(1024, region.bottom());
+                    assertEquals(2048, region.top());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSmallSegmentSize() throws Exception
+    {
+        int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+        MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+        ByteBuffer buffer = allocateBuffer(4096);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(1024);
+            regions.extend(2048);
+            regions.extend(4096);
+
+            final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+            for (int i = 0; i < buffer.capacity(); i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                assertEquals(SIZE * (i / SIZE), region.bottom());
+                assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+            }
+        }
+        finally
+        {
+            MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+        }
+    }
+
+    @Test
+    public void testAllocRegions() throws Exception
+    {
+        int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+        MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+        ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3);
+
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(buffer.capacity());
+
+            final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+            for (int i = 0; i < buffer.capacity(); i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                assertEquals(SIZE * (i / SIZE), region.bottom());
+                assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+            }
+        }
+        finally
+        {
+            MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+        }
+    }
+
+    @Test
+    public void testCopy() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(128 * 1024);
+
+        MmappedRegions snapshot;
+        ChannelProxy channelCopy;
+
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer));
+            MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4))
+        {
+            // create 3 more segments, one per quater capacity
+            regions.extend(buffer.capacity() / 2);
+            regions.extend(3 * buffer.capacity() / 4);
+            regions.extend(buffer.capacity());
+
+            // make a snapshot
+            snapshot = regions.sharedCopy();
+
+            // keep the channel open
+            channelCopy = channel.sharedCopy();
+        }
+
+        assertFalse(snapshot.isCleanedUp());
+
+        final int SIZE = buffer.capacity() / 4;
+        for (int i = 0; i < buffer.capacity(); i++)
+        {
+            MmappedRegions.Region region = snapshot.floor(i);
+            assertNotNull(region);
+            assertEquals(SIZE * (i / SIZE), region.bottom());
+            assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+
+            // check we can access the buffer
+            assertNotNull(region.buffer.duplicate().getInt());
+        }
+
+        assertNull(snapshot.close(null));
+        assertNull(channelCopy.close(null));
+        assertTrue(snapshot.isCleanedUp());
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testCopyCannotExtend() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(128 * 1024);
+
+        MmappedRegions snapshot;
+        ChannelProxy channelCopy;
+
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(buffer.capacity() / 2);
+
+            // make a snapshot
+            snapshot = regions.sharedCopy();
+
+            // keep the channel open
+            channelCopy = channel.sharedCopy();
+        }
+
+        try
+        {
+            snapshot.extend(buffer.capacity());
+        }
+        finally
+        {
+            assertNull(snapshot.close(null));
+            assertNull(channelCopy.close(null));
+        }
+    }
+
+    @Test
+    public void testExtendOutOfOrder() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(4096);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(4096);
+            regions.extend(1024);
+            regions.extend(2048);
+
+            for (int i = 0; i < buffer.capacity(); i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                assertEquals(0, region.bottom());
+                assertEquals(4096, region.top());
+            }
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeExtend() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(1024);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(-1);
+        }
+    }
+
+    @Test
+    public void testMapForCompressionMetadata() throws Exception
+    {
+        int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+        MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+        ByteBuffer buffer = allocateBuffer(128 * 1024);
+        File f = File.createTempFile("testMapForCompressionMetadata", "1");
+        f.deleteOnExit();
+
+        File cf = File.createTempFile(f.getName() + ".metadata", "1");
+        cf.deleteOnExit();
+
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance))
+                                                     .replayPosition(null);
+        try(SequentialWriter writer = new CompressedSequentialWriter(f,
+                                                                     cf.getAbsolutePath(),
+                                                                     CompressionParams.snappy(),
+                                                                     sstableMetadataCollector))
+        {
+            writer.write(buffer);
+            writer.finish();
+        }
+
+        CompressionMetadata metadata = new CompressionMetadata(cf.getAbsolutePath(), f.length(), ChecksumType.CRC32);
+        try(ChannelProxy channel = new ChannelProxy(f);
+            MmappedRegions regions = MmappedRegions.map(channel, metadata))
+        {
+
+            assertFalse(regions.isEmpty());
+            int i = 0;
+            while(i < buffer.capacity())
+            {
+                CompressionMetadata.Chunk chunk = metadata.chunkFor(i);
+
+                MmappedRegions.Region region = regions.floor(chunk.offset);
+                assertNotNull(region);
+
+                ByteBuffer compressedChunk = region.buffer.duplicate();
+                assertNotNull(compressedChunk);
+                assertEquals(chunk.length + 4, compressedChunk.capacity());
+
+                assertEquals(chunk.offset, region.bottom());
+                assertEquals(chunk.offset + chunk.length + 4, region.top());
+
+                i += metadata.chunkLength();
+            }
+        }
+        finally
+        {
+            MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+            metadata.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalArgForMap1() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(1024);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
+            MmappedRegions regions = MmappedRegions.map(channel, 0))
+        {
+            assertTrue(regions.isEmpty());
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalArgForMap2() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(1024);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
+            MmappedRegions regions = MmappedRegions.map(channel, -1L))
+        {
+            assertTrue(regions.isEmpty());
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalArgForMap3() throws Exception
+    {
+        ByteBuffer buffer = allocateBuffer(1024);
+        try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap3", buffer));
+            MmappedRegions regions = MmappedRegions.map(channel, null))
+        {
+            assertTrue(regions.isEmpty());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
index 3aad7e9..3ebbc67 100644
--- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -14,7 +14,6 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Random;
 
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
@@ -180,17 +179,10 @@ public class NIODataInputStreamTest
     }
 
     @SuppressWarnings("resource")
-    @Test(expected = IllegalArgumentException.class)
-    public void testTooSmallBufferSize() throws Exception
-    {
-        new NIODataInputStream(new FakeChannel(), 4);
-    }
-
-    @SuppressWarnings("resource")
     @Test(expected = NullPointerException.class)
     public void testNullRBC() throws Exception
     {
-        new NIODataInputStream(null, 8);
+        new NIODataInputStream(null, 9);
     }
 
     @SuppressWarnings("resource")
@@ -769,7 +761,7 @@ public class NIODataInputStreamTest
                 out.writeUnsignedVInt(value);
 
                 buf.position(ii);
-                NIODataInputStream in = new DataInputBuffer(buf, false);
+                RebufferingInputStream in = new DataInputBuffer(buf, false);
 
                 assertEquals(value, in.readUnsignedVInt());
             }
@@ -792,7 +784,7 @@ public class NIODataInputStreamTest
             out.writeUnsignedVInt(value);
 
             buf.position(0);
-            NIODataInputStream in = new DataInputBuffer(buf, false);
+            RebufferingInputStream in = new DataInputBuffer(buf, false);
 
             assertEquals(value, in.readUnsignedVInt());
 
@@ -831,7 +823,7 @@ public class NIODataInputStreamTest
             truncated.put(buf);
             truncated.flip();
 
-            NIODataInputStream in = new DataInputBuffer(truncated, false);
+            RebufferingInputStream in = new DataInputBuffer(truncated, false);
 
             boolean threw = false;
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
new file mode 100644
index 0000000..f0d4383
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@ -0,0 +1,483 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RandomAccessReaderTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(RandomAccessReaderTest.class);
+
+    private static final class Parameters
+    {
+        public final long fileLength;
+        public final int bufferSize;
+
+        public BufferType bufferType;
+        public int maxSegmentSize;
+        public boolean mmappedRegions;
+        public byte[] expected;
+
+        Parameters(long fileLength, int bufferSize)
+        {
+            this.fileLength = fileLength;
+            this.bufferSize = bufferSize;
+            this.bufferType = BufferType.OFF_HEAP;
+            this.maxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
+            this.mmappedRegions = false;
+            this.expected = "The quick brown fox jumps over the lazy dog".getBytes(FileUtils.CHARSET);
+        }
+
+        public Parameters mmappedRegions(boolean mmappedRegions)
+        {
+            this.mmappedRegions = mmappedRegions;
+            return this;
+        }
+
+        public Parameters bufferType(BufferType bufferType)
+        {
+            this.bufferType = bufferType;
+            return this;
+        }
+
+        public Parameters maxSegmentSize(int maxSegmentSize)
+        {
+            this.maxSegmentSize = maxSegmentSize;
+            return this;
+        }
+
+        public Parameters expected(byte[] expected)
+        {
+            this.expected = expected;
+            return this;
+        }
+    }
+
+    @Test
+    public void testBufferedOffHeap() throws IOException
+    {
+        testReadFully(new Parameters(8192, 4096).bufferType(BufferType.OFF_HEAP));
+    }
+
+    @Test
+    public void testBufferedOnHeap() throws IOException
+    {
+        testReadFully(new Parameters(8192, 4096).bufferType(BufferType.ON_HEAP));
+    }
+
+    @Test
+    public void testBigBufferSize() throws IOException
+    {
+        testReadFully(new Parameters(8192, 65536).bufferType(BufferType.ON_HEAP));
+    }
+
+    @Test
+    public void testTinyBufferSize() throws IOException
+    {
+        testReadFully(new Parameters(8192, 16).bufferType(BufferType.ON_HEAP));
+    }
+
+    @Test
+    public void testOneSegment() throws IOException
+    {
+        testReadFully(new Parameters(8192, 4096).mmappedRegions(true));
+    }
+
+    @Test
+    public void testMultipleSegments() throws IOException
+    {
+        testReadFully(new Parameters(8192, 4096).mmappedRegions(true).maxSegmentSize(1024));
+    }
+
+    @Test
+    public void testVeryLarge() throws IOException
+    {
+        final long SIZE = 1L << 32; // 2GB
+        Parameters params = new Parameters(SIZE, 1 << 20); // 1MB
+
+        try(ChannelProxy channel = new ChannelProxy("abc", new FakeFileChannel(SIZE)))
+        {
+            RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
+                                                 .bufferType(params.bufferType)
+                                                 .bufferSize(params.bufferSize);
+
+            try(RandomAccessReader reader = builder.build())
+            {
+                assertEquals(channel.size(), reader.length());
+                assertEquals(channel.size(), reader.bytesRemaining());
+                assertEquals(Integer.MAX_VALUE, reader.available());
+
+                assertEquals(channel.size(), reader.skip(channel.size()));
+
+                assertTrue(reader.isEOF());
+                assertEquals(0, reader.bytesRemaining());
+            }
+        }
+    }
+
+    /** A fake file channel that simply increments the position and doesn't
+     * actually read anything. We use it to simulate very large files, > 2G.
+     */
+    private static final class FakeFileChannel extends FileChannel
+    {
+        private final long size;
+        private long position;
+
+        FakeFileChannel(long size)
+        {
+            this.size = size;
+        }
+
+        public int read(ByteBuffer dst)
+        {
+            int ret = dst.remaining();
+            position += ret;
+            dst.position(dst.limit());
+            return ret;
+        }
+
+        public long read(ByteBuffer[] dsts, int offset, int length)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public int write(ByteBuffer src)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long write(ByteBuffer[] srcs, int offset, int length)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long position()
+        {
+            return position;
+        }
+
+        public FileChannel position(long newPosition)
+        {
+            position = newPosition;
+            return this;
+        }
+
+        public long size()
+        {
+            return size;
+        }
+
+        public FileChannel truncate(long size)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void force(boolean metaData)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long transferTo(long position, long count, WritableByteChannel target)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long transferFrom(ReadableByteChannel src, long position, long count)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public int read(ByteBuffer dst, long position)
+        {
+            int ret = dst.remaining();
+            this.position = position + ret;
+            dst.position(dst.limit());
+            return ret;
+        }
+
+        public int write(ByteBuffer src, long position)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public MappedByteBuffer map(MapMode mode, long position, long size)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public FileLock lock(long position, long size, boolean shared)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public FileLock tryLock(long position, long size, boolean shared)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        protected void implCloseChannel()
+        {
+
+        }
+    }
+
+    private static File writeFile(Parameters params) throws IOException
+    {
+        final File f = File.createTempFile("testReadFully", "1");
+        f.deleteOnExit();
+
+        try(SequentialWriter writer = SequentialWriter.open(f))
+        {
+            long numWritten = 0;
+            while (numWritten < params.fileLength)
+            {
+                writer.write(params.expected);
+                numWritten += params.expected.length;
+            }
+
+            writer.finish();
+        }
+
+        assert f.exists();
+        assert f.length() >= params.fileLength;
+        return f;
+    }
+
+    private static void testReadFully(Parameters params) throws IOException
+    {
+        final File f = writeFile(params);
+        try(ChannelProxy channel = new ChannelProxy(f))
+        {
+            RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
+                                                 .bufferType(params.bufferType)
+                                                 .bufferSize(params.bufferSize);
+            if (params.mmappedRegions)
+                builder.regions(MmappedRegions.map(channel, f.length()));
+
+            try(RandomAccessReader reader = builder.build())
+            {
+                assertEquals(f.getAbsolutePath(), reader.getPath());
+                assertEquals(f.length(), reader.length());
+                assertEquals(f.length(), reader.bytesRemaining());
+                assertEquals(Math.min(Integer.MAX_VALUE, f.length()), reader.available());
+
+                byte[] b = new byte[params.expected.length];
+                long numRead = 0;
+                while (numRead < params.fileLength)
+                {
+                    reader.readFully(b);
+                    assertTrue(Arrays.equals(params.expected, b));
+                    numRead += b.length;
+                }
+
+                assertTrue(reader.isEOF());
+                assertEquals(0, reader.bytesRemaining());
+            }
+
+            if (builder.regions != null)
+                assertNull(builder.regions.close(null));
+        }
+    }
+
+    @Test
+    public void testReadBytes() throws IOException
+    {
+        File f = File.createTempFile("testReadBytes", "1");
+        final String expected = "The quick brown fox jumps over the lazy dog";
+
+        try(SequentialWriter writer = SequentialWriter.open(f))
+        {
+            writer.write(expected.getBytes());
+            writer.finish();
+        }
+
+        assert f.exists();
+
+        try(ChannelProxy channel = new ChannelProxy(f);
+            RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+        {
+            assertEquals(f.getAbsolutePath(), reader.getPath());
+            assertEquals(expected.length(), reader.length());
+
+            ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
+            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+            assertTrue(reader.isEOF());
+            assertEquals(0, reader.bytesRemaining());
+        }
+    }
+
+    @Test
+    public void testReset() throws IOException
+    {
+        File f = File.createTempFile("testMark", "1");
+        final String expected = "The quick brown fox jumps over the lazy dog";
+        final int numIterations = 10;
+
+        try(SequentialWriter writer = SequentialWriter.open(f))
+        {
+            for (int i = 0; i < numIterations; i++)
+                writer.write(expected.getBytes());
+            writer.finish();
+        }
+
+        assert f.exists();
+
+        try(ChannelProxy channel = new ChannelProxy(f);
+        RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+        {
+            assertEquals(expected.length() * numIterations, reader.length());
+
+            ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
+            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+            assertFalse(reader.isEOF());
+            assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
+
+            FileMark mark = reader.mark();
+            assertEquals(0, reader.bytesPastMark());
+            assertEquals(0, reader.bytesPastMark(mark));
+
+            for (int i = 0; i < (numIterations - 1); i++)
+            {
+                b = ByteBufferUtil.read(reader, expected.length());
+                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+            }
+            assertTrue(reader.isEOF());
+            assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark());
+            assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
+
+            reader.reset(mark);
+            assertEquals(0, reader.bytesPastMark());
+            assertEquals(0, reader.bytesPastMark(mark));
+            assertFalse(reader.isEOF());
+            for (int i = 0; i < (numIterations - 1); i++)
+            {
+                b = ByteBufferUtil.read(reader, expected.length());
+                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+            }
+
+            reader.reset();
+            assertEquals(0, reader.bytesPastMark());
+            assertEquals(0, reader.bytesPastMark(mark));
+            assertFalse(reader.isEOF());
+            for (int i = 0; i < (numIterations - 1); i++)
+            {
+                b = ByteBufferUtil.read(reader, expected.length());
+                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+            }
+
+            assertTrue(reader.isEOF());
+        }
+    }
+
+    @Test
+    public void testSeekSingleThread() throws IOException, InterruptedException
+    {
+        testSeek(1);
+    }
+
+    @Test
+    public void testSeekMultipleThreads() throws IOException, InterruptedException
+    {
+        testSeek(10);
+    }
+
+    private static void testSeek(int numThreads) throws IOException, InterruptedException
+    {
+        final File f = File.createTempFile("testMark", "1");
+        final byte[] expected = new byte[1 << 16];
+
+        long seed = System.nanoTime();
+        //seed = 365238103404423L;
+        logger.info("Seed {}", seed);
+        Random r = new Random(seed);
+        r.nextBytes(expected);
+
+        try(SequentialWriter writer = SequentialWriter.open(f))
+        {
+            writer.write(expected);
+            writer.finish();
+        }
+
+        assert f.exists();
+
+        try(final ChannelProxy channel = new ChannelProxy(f))
+        {
+            final Runnable worker = () ->
+            {
+                try(RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+                {
+                    assertEquals(expected.length, reader.length());
+
+                    ByteBuffer b = ByteBufferUtil.read(reader, expected.length);
+                    assertTrue(Arrays.equals(expected, b.array()));
+                    assertTrue(reader.isEOF());
+                    assertEquals(0, reader.bytesRemaining());
+
+                    reader.seek(0);
+                    b = ByteBufferUtil.read(reader, expected.length);
+                    assertTrue(Arrays.equals(expected, b.array()));
+                    assertTrue(reader.isEOF());
+                    assertEquals(0, reader.bytesRemaining());
+
+                    for (int i = 0; i < 10; i++)
+                    {
+                        int pos = r.nextInt(expected.length);
+                        reader.seek(pos);
+                        assertEquals(pos, reader.getPosition());
+
+                        ByteBuffer buf = ByteBuffer.wrap(expected, pos, expected.length - pos)
+                                                   .order(ByteOrder.BIG_ENDIAN);
+
+                        while (reader.bytesRemaining() > 4)
+                            assertEquals(buf.getInt(), reader.readInt());
+                    }
+
+                    reader.close();
+                }
+                catch (Exception ex)
+                {
+                    ex.printStackTrace();
+                    fail(ex.getMessage());
+                }
+            };
+
+            if (numThreads == 1)
+            {
+                worker.run();
+            }
+            else
+            {
+                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+                for (int i = 0; i < numThreads; i++)
+                    executor.submit(worker);
+
+                executor.shutdown();
+                executor.awaitTermination(1, TimeUnit.MINUTES);
+            }
+        }
+    }
+}


Mime
View raw message