cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Fix regression with compressed reader performance due to no pooling and excessive mapping/unmapping
Date Thu, 07 May 2015 10:31:36 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk ea2ee3703 -> aedce5fc6


Fix regression with compressed reader performance
due to no pooling and excessive mapping/unmapping

patch by benedict; reviewed by tjake for CASSANDRA-9240


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aedce5fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aedce5fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aedce5fc

Branch: refs/heads/trunk
Commit: aedce5fc6ba46ca734e91190cfaaeb23ba47a846
Parents: ea2ee37
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Thu May 7 11:31:08 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Thu May 7 11:31:08 2015 +0100

----------------------------------------------------------------------
 .../compress/CompressedRandomAccessReader.java  | 94 ++++++--------------
 .../io/compress/CompressedThrottledReader.java  |  9 +-
 .../io/compress/DeflateCompressor.java          |  7 +-
 .../cassandra/io/compress/LZ4Compressor.java    |  6 +-
 .../cassandra/io/compress/SnappyCompressor.java |  2 +
 .../io/util/CompressedPoolingSegmentedFile.java | 39 ++++++--
 .../io/util/CompressedSegmentedFile.java        | 71 ++++++++++++++-
 .../cassandra/io/util/ICompressedFile.java      |  5 ++
 .../cassandra/io/util/RandomAccessReader.java   |  4 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  2 +-
 10 files changed, 152 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index edf8c68..1febe37 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -42,17 +42,23 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
-    private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
-
     public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata
metadata)
     {
-        return open(channel, metadata, null);
+        try
+        {
+            return new CompressedRandomAccessReader(channel, metadata, null);
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
-    public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata
metadata, CompressedPoolingSegmentedFile owner)
+
+    public static CompressedRandomAccessReader open(ICompressedFile file)
     {
         try
         {
-            return new CompressedRandomAccessReader(channel, metadata, owner);
+            return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
         }
         catch (FileNotFoundException e)
         {
@@ -60,9 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
-
-    private TreeMap<Long, MappedByteBuffer> chunkSegments;
-    private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+    private final TreeMap<Long, MappedByteBuffer> chunkSegments;
 
     private final CompressionMetadata metadata;
 
@@ -75,61 +79,24 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     // raw checksum bytes
     private ByteBuffer checksumBytes;
 
-    protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
+    protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata,
ICompressedFile file) throws FileNotFoundException
     {
-        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(),
owner);
+        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(),
file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
         this.metadata = metadata;
         checksum = new Adler32();
 
-        if (!useMmap)
+        chunkSegments = file == null ? null : file.chunkSegments();
+        if (chunkSegments == null)
         {
-            compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+            compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()),
metadata.compressor().useDirectOutputByteBuffers());
             checksumBytes = ByteBuffer.wrap(new byte[4]);
         }
-        else
-        {
-            try
-            {
-                createMappedSegments();
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-    }
-
-    private void createMappedSegments() throws IOException
-    {
-        chunkSegments = new TreeMap<>();
-        long offset = 0;
-        long lastSegmentOffset = 0;
-        long segmentSize = 0;
-
-        while (offset < metadata.dataLength)
-        {
-            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
-
-            //Reached a new mmap boundary
-            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
-            {
-                chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
-                lastSegmentOffset += segmentSize;
-                segmentSize = 0;
-            }
-
-            segmentSize += chunk.length + 4; //checksum
-            offset += metadata.chunkLength();
-        }
-
-        if (segmentSize > 0)
-            chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
     }
 
     protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
     {
         assert Integer.bitCount(bufferSize) == 1;
-        return useMmap && useDirect
+        return useDirect
                 ? ByteBuffer.allocateDirect(bufferSize)
                 : ByteBuffer.allocate(bufferSize);
     }
@@ -138,16 +105,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     public void deallocate()
     {
         super.deallocate();
-
-        if (chunkSegments != null)
-        {
-            for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
-            {
-                FileUtils.clean(entry.getValue());
-            }
-        }
-
-        chunkSegments = null;
+        if (compressed != null)
+            FileUtils.clean(compressed);
+        compressed = null;
     }
 
     private void reBufferStandard()
@@ -175,7 +135,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             int decompressedBytes;
             try
             {
-                decompressedBytes = metadata.compressor().uncompress(compressed.array(),
0, chunk.length, buffer.array(), 0);
+                decompressedBytes = metadata.compressor().uncompress(compressed, buffer);
                 buffer.limit(decompressedBytes);
             }
             catch (IOException e)
@@ -186,8 +146,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
             if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
-
-                checksum.update(compressed.array(), 0, chunk.length);
+                compressed.position(0);
+                FBUtilities.directCheckSum(checksum, compressed);
 
                 if (checksum(chunk) != (int) checksum.getValue())
                     throw new CorruptBlockException(getPath(), chunk);
@@ -226,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
             long segmentOffset = entry.getKey();
             int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
-            MappedByteBuffer compressedChunk = entry.getValue();
+            ByteBuffer compressedChunk = entry.getValue().duplicate();
 
             compressedChunk.position(chunkOffset);
             compressedChunk.limit(chunkOffset + chunk.length);
@@ -284,7 +244,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     @Override
     protected void reBuffer()
     {
-        if (useMmap)
+        if (chunkSegments != null)
         {
             reBufferMmap();
         }
@@ -305,7 +265,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     public int getTotalBufferSize()
     {
-        return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity());
+        return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
index 63727d8..a29129c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -26,14 +26,15 @@ import java.io.FileNotFoundException;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.ICompressedFile;
 
 public class CompressedThrottledReader extends CompressedRandomAccessReader
 {
     private final RateLimiter limiter;
 
-    public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata,
RateLimiter limiter) throws FileNotFoundException
+    public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata,
ICompressedFile file, RateLimiter limiter) throws FileNotFoundException
     {
-        super(channel, metadata, null);
+        super(channel, metadata, file);
         this.limiter = limiter;
     }
 
@@ -43,11 +44,11 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
         super.reBuffer();
     }
 
-    public static CompressedThrottledReader open(ChannelProxy channel, CompressionMetadata
metadata, RateLimiter limiter)
+    public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
     {
         try
         {
-            return new CompressedThrottledReader(channel, metadata, limiter);
+            return new CompressedThrottledReader(file.channel(), file.getMetadata(), file,
limiter);
         }
         catch (FileNotFoundException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index a88e4d2..833c375 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -122,13 +122,14 @@ public class DeflateCompressor implements ICompressor
         }
     }
 
-    public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException
+    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
     {
         if (!output.hasArray())
             throw new IllegalArgumentException("DeflateCompressor doesn't work with direct
byte buffers");
 
-        byte[] input = ByteBufferUtil.getArray(input_);
-        return uncompress(input, 0, input.length, output.array(), output.arrayOffset() +
output.position());
+        if (input.hasArray())
+            return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(),
output.array(), output.arrayOffset() + output.position());
+        return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(),
output.arrayOffset() + output.position());
     }
 
     public boolean useDirectOutputByteBuffers()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index ab10a00..9d54048 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -83,6 +83,7 @@ public class LZ4Compressor implements ICompressor
                 | ((input[inputOffset + 1] & 0xFF) << 8)
                 | ((input[inputOffset + 2] & 0xFF) << 16)
                 | ((input[inputOffset + 3] & 0xFF) << 24);
+
         final int compressedLength;
         try
         {
@@ -104,6 +105,9 @@ public class LZ4Compressor implements ICompressor
 
     public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
     {
+        if (input.hasArray() && output.hasArray())
+            return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(),
output.array(), output.arrayOffset() + output.position());
+
         int pos = input.position();
         final int decompressedLength = (input.get(pos) & 0xFF)
                 | ((input.get(pos + 1) & 0xFF) << 8)
@@ -132,7 +136,7 @@ public class LZ4Compressor implements ICompressor
     @Override
     public boolean useDirectOutputByteBuffers()
     {
-        return false;
+        return true;
     }
 
     public Set<String> supportedOptions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index d1f1f34..04f676b 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -96,6 +96,8 @@ public class SnappyCompressor implements ICompressor
 
     public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
     {
+        if (input.hasArray() && output.hasArray())
+            return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(),
input.remaining(), output.array(), output.arrayOffset() + output.position());
         return Snappy.uncompress(input, output);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 502c461..cb30131 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -17,6 +17,10 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.TreeMap;
+
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
@@ -27,31 +31,56 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
 {
     public final CompressionMetadata metadata;
+    private final TreeMap<Long, MappedByteBuffer> chunkSegments;
 
     public CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata)
     {
-        super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength);
+        this(channel, metadata, CompressedSegmentedFile.createMappedSegments(channel, metadata));
+    }
+
+    private CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata,
TreeMap<Long, MappedByteBuffer> chunkSegments)
+    {
+        super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength,
metadata.compressedFileLength);
         this.metadata = metadata;
+        this.chunkSegments = chunkSegments;
     }
 
     private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy)
     {
         super(copy);
         this.metadata = copy.metadata;
+        this.chunkSegments = copy.chunkSegments;
+    }
+
+    public ChannelProxy channel()
+    {
+        return channel;
+    }
+
+    public TreeMap<Long, MappedByteBuffer> chunkSegments()
+    {
+        return chunkSegments;
     }
 
     protected static final class Cleanup extends PoolingSegmentedFile.Cleanup
     {
         final CompressionMetadata metadata;
-        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata)
+        final TreeMap<Long, MappedByteBuffer> chunkSegments;
+        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long,
MappedByteBuffer> chunkSegments)
         {
             super(channel);
             this.metadata = metadata;
+            this.chunkSegments = chunkSegments;
         }
         public void tidy()
         {
             super.tidy();
             metadata.close();
+            if (chunkSegments != null)
+            {
+                for (MappedByteBuffer segment : chunkSegments.values())
+                    FileUtils.clean(segment);
+            }
         }
     }
 
@@ -82,17 +111,17 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile
impleme
 
     public RandomAccessReader createReader()
     {
-        return CompressedRandomAccessReader.open(channel, metadata, null);
+        return CompressedRandomAccessReader.open(this);
     }
 
     public RandomAccessReader createThrottledReader(RateLimiter limiter)
     {
-        return CompressedThrottledReader.open(channel, metadata, limiter);
+        return CompressedThrottledReader.open(this, limiter);
     }
 
     protected RandomAccessReader createPooledReader()
     {
-        return CompressedRandomAccessReader.open(channel, metadata, this);
+        return CompressedRandomAccessReader.open(this);
     }
 
     public CompressionMetadata getMetadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 5d2c897..caf4c22 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,8 +17,14 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.TreeMap;
+
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
@@ -27,31 +33,88 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
 {
     public final CompressionMetadata metadata;
+    private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
+    private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+    private final TreeMap<Long, MappedByteBuffer> chunkSegments;
 
     public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata)
     {
-        super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength);
+        this(channel, metadata, createMappedSegments(channel, metadata));
+    }
+
+    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long,
MappedByteBuffer> chunkSegments)
+    {
+        super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength,
metadata.compressedFileLength);
         this.metadata = metadata;
+        this.chunkSegments = chunkSegments;
     }
 
     private CompressedSegmentedFile(CompressedSegmentedFile copy)
     {
         super(copy);
         this.metadata = copy.metadata;
+        this.chunkSegments = copy.chunkSegments;
+    }
+
+    public ChannelProxy channel()
+    {
+        return channel;
+    }
+
+    public TreeMap<Long, MappedByteBuffer> chunkSegments()
+    {
+        return chunkSegments;
+    }
+
+    static TreeMap<Long, MappedByteBuffer> createMappedSegments(ChannelProxy channel,
CompressionMetadata metadata)
+    {
+        if (!useMmap)
+            return null;
+        TreeMap<Long, MappedByteBuffer> chunkSegments = new TreeMap<>();
+        long offset = 0;
+        long lastSegmentOffset = 0;
+        long segmentSize = 0;
+
+        while (offset < metadata.dataLength)
+        {
+            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+            //Reached a new mmap boundary
+            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+            {
+                chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
+                lastSegmentOffset += segmentSize;
+                segmentSize = 0;
+            }
+
+            segmentSize += chunk.length + 4; //checksum
+            offset += metadata.chunkLength();
+        }
+
+        if (segmentSize > 0)
+            chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
+        return chunkSegments;
     }
 
     private static final class Cleanup extends SegmentedFile.Cleanup
     {
         final CompressionMetadata metadata;
-        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata)
+        final TreeMap<Long, MappedByteBuffer> chunkSegments;
+        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long,
MappedByteBuffer> chunkSegments)
         {
             super(channel);
             this.metadata = metadata;
+            this.chunkSegments = chunkSegments;
         }
         public void tidy()
         {
             super.tidy();
             metadata.close();
+            if (chunkSegments != null)
+            {
+                for (MappedByteBuffer segment : chunkSegments.values())
+                    FileUtils.clean(segment);
+            }
         }
     }
 
@@ -97,12 +160,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements
ICompresse
 
     public RandomAccessReader createReader()
     {
-        return CompressedRandomAccessReader.open(channel, metadata);
+        return CompressedRandomAccessReader.open(this);
     }
 
     public RandomAccessReader createThrottledReader(RateLimiter limiter)
     {
-        return CompressedThrottledReader.open(channel, metadata, limiter);
+        return CompressedThrottledReader.open(this, limiter);
     }
 
     public CompressionMetadata getMetadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index 3ca7718..ce7b22c 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -17,9 +17,14 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.nio.MappedByteBuffer;
+import java.util.TreeMap;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public interface ICompressedFile
 {
+    public ChannelProxy channel();
     public CompressionMetadata getMetadata();
+    public TreeMap<Long, MappedByteBuffer> chunkSegments();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 87ba677..328095b 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -65,8 +65,8 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
     {
         int size = (int) Math.min(fileLength, bufferSize);
         return useDirectBuffer
-                ? ByteBuffer.allocate(size)
-                : ByteBuffer.allocateDirect(size);
+                ? ByteBuffer.allocateDirect(size)
+                : ByteBuffer.allocate(size);
     }
 
     public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile
owner)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 129d914..cb4d132 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -133,7 +133,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
      */
     public static Builder getBuilder(Config.DiskAccessMode mode, boolean compressed)
     {
-        return compressed ? new CompressedSegmentedFile.Builder(null)
+        return compressed ? new CompressedPoolingSegmentedFile.Builder(null)
                           : mode == Config.DiskAccessMode.mmap ? new MmappedSegmentedFile.Builder()
                                                                : new BufferedPoolingSegmentedFile.Builder();
     }


Mime
View raw message