cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/4] cassandra git commit: IndexSummaryBuilder utilises offheap memory, and shares data between each IndexSummary opened from it
Date Wed, 04 Mar 2015 15:18:13 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 3c3fefa04 -> f3c0e11e2
  refs/heads/trunk 20b62de80 -> e473ce066


IndexSummaryBuilder utilises offheap memory, and shares data between
each IndexSummary opened from it

patch by benedict; reviewed by ariel for CASSANDRA-8757


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3c0e11e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3c0e11e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3c0e11e

Branch: refs/heads/cassandra-2.1
Commit: f3c0e11e2ddb0b0666e7723a3fca005707b778ea
Parents: 3c3fefa
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Wed Mar 4 15:07:32 2015 +0000
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Wed Mar 4 15:07:32 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/io/sstable/IndexSummary.java      | 113 ++++++++++-----
 .../io/sstable/IndexSummaryBuilder.java         | 144 +++++++++----------
 .../cassandra/io/sstable/SSTableReader.java     |  69 ++++-----
 .../cassandra/io/sstable/SSTableWriter.java     |   4 +-
 .../cassandra/io/util/AbstractDataOutput.java   |   4 +-
 .../cassandra/io/util/DataOutputPlus.java       |   3 +-
 .../org/apache/cassandra/io/util/Memory.java    |  40 +++++-
 .../cassandra/io/util/SafeMemoryWriter.java     | 136 ++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  |   2 +-
 .../concurrent/WrappedSharedCloseable.java      |  14 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  54 +++----
 .../cassandra/io/util/DataOutputTest.java       |  11 ++
 13 files changed, 411 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6133536..3b373ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@
  * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
    (CASSANDRA-8154)
  * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+   each IndexSummary opened from it (CASSANDRA-8757)
 Merged from 2.0:
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0cde124..bad50b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,9 +54,16 @@ public class IndexSummary extends WrappedSharedCloseable
     private final int minIndexInterval;
 
     private final IPartitioner partitioner;
-    private final int summarySize;
     private final int sizeAtFullSampling;
-    private final Memory bytes;
+    // we permit the memory to span a range larger than we use,
+    // so we have an accompanying count and length for each part
+    // we split our data into two ranges: offsets (indexing into entries),
+    // and entries containing the summary data
+    private final Memory offsets;
+    private final int offsetCount;
+    // entries is a list of (partition key, index file offset) pairs
+    private final Memory entries;
+    private final long entriesLength;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -66,15 +73,18 @@ public class IndexSummary extends WrappedSharedCloseable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
-                        int minIndexInterval, int samplingLevel)
+    public IndexSummary(IPartitioner partitioner, Memory offsets, int offsetCount, Memory
entries, long entriesLength,
+                        int sizeAtFullSampling, int minIndexInterval, int samplingLevel)
     {
-        super(bytes);
+        super(new Memory[] { offsets, entries });
+        assert offsets.getInt(0) == 0;
         this.partitioner = partitioner;
         this.minIndexInterval = minIndexInterval;
-        this.summarySize = summarySize;
+        this.offsetCount = offsetCount;
+        this.entriesLength = entriesLength;
         this.sizeAtFullSampling = sizeAtFullSampling;
-        this.bytes = bytes;
+        this.offsets = offsets;
+        this.entries = entries;
         this.samplingLevel = samplingLevel;
     }
 
@@ -83,9 +93,11 @@ public class IndexSummary extends WrappedSharedCloseable
         super(copy);
         this.partitioner = copy.partitioner;
         this.minIndexInterval = copy.minIndexInterval;
-        this.summarySize = copy.summarySize;
+        this.offsetCount = copy.offsetCount;
+        this.entriesLength = copy.entriesLength;
         this.sizeAtFullSampling = copy.sizeAtFullSampling;
-        this.bytes = copy.bytes;
+        this.offsets = copy.offsets;
+        this.entries = copy.entries;
         this.samplingLevel = copy.samplingLevel;
     }
 
@@ -93,7 +105,7 @@ public class IndexSummary extends WrappedSharedCloseable
     // Harmony's Collections implementation
     public int binarySearch(RowPosition key)
     {
-        int low = 0, mid = summarySize, high = mid - 1, result = -1;
+        int low = 0, mid = offsetCount, high = mid - 1, result = -1;
         while (low <= high)
         {
             mid = (low + high) >> 1;
@@ -123,7 +135,7 @@ public class IndexSummary extends WrappedSharedCloseable
     public int getPositionInSummary(int index)
     {
         // The first section of bytes holds a four-byte position for each entry in the summary,
so just multiply by 4.
-        return bytes.getInt(index << 2);
+        return offsets.getInt(index << 2);
     }
 
     public byte[] getKey(int index)
@@ -131,27 +143,23 @@ public class IndexSummary extends WrappedSharedCloseable
         long start = getPositionInSummary(index);
         int keySize = (int) (calculateEnd(index) - start - 8L);
         byte[] key = new byte[keySize];
-        bytes.getBytes(start, key, 0, keySize);
+        entries.getBytes(start, key, 0, keySize);
         return key;
     }
 
     public long getPosition(int index)
     {
-        return bytes.getLong(calculateEnd(index) - 8);
+        return entries.getLong(calculateEnd(index) - 8);
     }
 
-    public byte[] getEntry(int index)
+    public long getEndInSummary(int index)
     {
-        long start = getPositionInSummary(index);
-        long end = calculateEnd(index);
-        byte[] entry = new byte[(int)(end - start)];
-        bytes.getBytes(start, entry, 0, (int) (end - start));
-        return entry;
+        return calculateEnd(index);
     }
 
     private long calculateEnd(int index)
     {
-        return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
+        return index == (offsetCount - 1) ? entriesLength : getPositionInSummary(index +
1);
     }
 
     public int getMinIndexInterval()
@@ -174,7 +182,7 @@ public class IndexSummary extends WrappedSharedCloseable
 
     public int size()
     {
-        return summarySize;
+        return offsetCount;
     }
 
     public int getSamplingLevel()
@@ -192,12 +200,27 @@ public class IndexSummary extends WrappedSharedCloseable
     }
 
     /**
-     * Returns the amount of off-heap memory used for this summary.
+     * Returns the amount of off-heap memory used for the entries portion of this summary.
      * @return size in bytes
      */
-    public long getOffHeapSize()
+    long getEntriesLength()
+    {
+        return entriesLength;
+    }
+
+    Memory getOffsets()
+    {
+        return offsets;
+    }
+
+    Memory getEntries()
+    {
+        return entries;
+    }
+
+    long getOffHeapSize()
     {
-        return bytes.size();
+        return offsetCount * 4 + entriesLength;
     }
 
     /**
@@ -224,14 +247,29 @@ public class IndexSummary extends WrappedSharedCloseable
         public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel)
throws IOException
         {
             out.writeInt(t.minIndexInterval);
-            out.writeInt(t.summarySize);
-            out.writeLong(t.bytes.size());
+            out.writeInt(t.offsetCount);
+            out.writeLong(t.getOffHeapSize());
             if (withSamplingLevel)
             {
                 out.writeInt(t.samplingLevel);
                 out.writeInt(t.sizeAtFullSampling);
             }
-            out.write(t.bytes);
+            // our on-disk representation treats the offsets and the summary data as one
contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the
offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two
regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the
offsets when de/serializing.
+            // In this case adding X to each of the offsets.
+            int baseOffset = t.offsetCount * 4;
+            for (int i = 0 ; i < t.offsetCount ; i++)
+            {
+                int offset = t.offsets.getInt(i * 4) + baseOffset;
+                // our serialization format for this file uses native byte order, so if this
is different to the
+                // default Java serialization order (BIG_ENDIAN) we have to reverse our bytes
+                if (ByteOrder.nativeOrder() != ByteOrder.BIG_ENDIAN)
+                    offset = Integer.reverseBytes(offset);
+                out.writeInt(offset);
+            }
+            out.write(t.entries, 0, t.entriesLength);
         }
 
         public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean
haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
@@ -243,7 +281,7 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     minIndexInterval, expectedMinIndexInterval));
             }
 
-            int summarySize = in.readInt();
+            int offsetCount = in.readInt();
             long offheapSize = in.readLong();
             int samplingLevel, fullSamplingSummarySize;
             if (haveSamplingLevel)
@@ -254,7 +292,7 @@ public class IndexSummary extends WrappedSharedCloseable
             else
             {
                 samplingLevel = BASE_SAMPLING_LEVEL;
-                fullSamplingSummarySize = summarySize;
+                fullSamplingSummarySize = offsetCount;
             }
 
             int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double)
samplingLevel) * minIndexInterval);
@@ -264,9 +302,18 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     " the current max index interval (%d)",
effectiveIndexInterval, maxIndexInterval));
             }
 
-            RefCountedMemory memory = new RefCountedMemory(offheapSize);
-            FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
-            return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize,
minIndexInterval, samplingLevel);
+            Memory offsets = Memory.allocate(offsetCount * 4);
+            Memory entries = Memory.allocate(offheapSize - offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+            // our on-disk representation treats the offsets and the summary data as one
contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the
offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two
regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the
offsets when de/serializing.
+            // In this case subtracting X from each of the offsets.
+            for (int i = 0 ; i < offsets.size() ; i += 4)
+                offsets.setInt(i, (int) (offsets.getInt(i) - offsets.size()));
+            return new IndexSummary(partitioner, offsets, offsetCount, entries, entries.size(),
fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 3b93b31..54e8dd2 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,36 +17,33 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.nio.ByteOrder;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemoryWriter;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
 
-public class IndexSummaryBuilder
+public class IndexSummaryBuilder implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
-    private final ArrayList<Long> positions;
-    private final ArrayList<DecoratedKey> keys;
+    // the offset in the keys memory region to look for a given summary boundary
+    private final SafeMemoryWriter offsets;
+    private final SafeMemoryWriter entries;
+
     private final int minIndexInterval;
     private final int samplingLevel;
     private final int[] startPoints;
     private long keysWritten = 0;
     private long indexIntervalMatches = 0;
-    private long offheapSize = 0;
     private long nextSamplePosition;
 
     // for each ReadableBoundary, we map its dataLength property to itself, permitting us
to lookup the
@@ -75,11 +72,15 @@ public class IndexSummaryBuilder
         final DecoratedKey lastKey;
         final long indexLength;
         final long dataLength;
-        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+        final int summaryCount;
+        final long entriesLength;
+        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength,
int summaryCount, long entriesLength)
         {
             this.lastKey = lastKey;
             this.indexLength = indexLength;
             this.dataLength = dataLength;
+            this.summaryCount = summaryCount;
+            this.entriesLength = entriesLength;
         }
     }
 
@@ -105,10 +106,9 @@ public class IndexSummaryBuilder
         }
 
         // for initializing data structures, adjust our estimates based on the sampling level
-        maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
-        positions = new ArrayList<>((int)maxExpectedEntries);
-        keys = new ArrayList<>((int)maxExpectedEntries);
-        // if we're downsampling we may not use index 0
+        maxExpectedEntries = Math.max(1, (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL);
+        offsets = new SafeMemoryWriter(4 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
+        entries = new SafeMemoryWriter(40 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
         setNextSamplePosition(-minIndexInterval);
     }
 
@@ -165,16 +165,16 @@ public class IndexSummaryBuilder
     {
         if (keysWritten == nextSamplePosition)
         {
-            keys.add(getMinimalKey(decoratedKey));
-            offheapSize += decoratedKey.getKey().remaining();
-            positions.add(indexStart);
-            offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+            assert entries.length() <= Integer.MAX_VALUE;
+            offsets.writeInt((int) entries.length());
+            entries.write(decoratedKey.getKey());
+            entries.writeLong(indexStart);
             setNextSamplePosition(keysWritten);
         }
         else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
         {
             // this is the last key in this summary interval, so stash it
-            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd,
(int)(offsets.length() / 4), entries.length());
             lastReadableByData.put(dataEnd, boundary);
             lastReadableByIndex.put(indexEnd, boundary);
         }
@@ -201,52 +201,39 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
+        // this method should only be called when we've finished appending records, so we
truncate the
+        // memory we're using to the exact amount required to represent it before building
our summary
+        entries.setCapacity(entries.length());
+        offsets.setCapacity(offsets.length());
         return build(partitioner, null);
     }
 
-    // lastIntervalKey should come from getLastReadableBoundary().lastKey
-    public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
+    // build the summary up to the provided boundary; this is backed by shared memory between
+    // multiple invocations of this build method
+    public IndexSummary build(IPartitioner partitioner, ReadableBoundary boundary)
     {
-        assert keys.size() > 0;
-        assert keys.size() == positions.size();
-
-        int length;
-        if (lastIntervalKey == null)
-            length = keys.size();
-        else // since it's an inclusive upper bound, this should never match exactly
-            length = -1 -Collections.binarySearch(keys, lastIntervalKey);
-
-        assert length > 0;
-
-        long offheapSize = this.offheapSize;
-        if (length < keys.size())
-            for (int i = length ; i < keys.size() ; i++)
-                offheapSize -= keys.get(i).getKey().remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
-
-        // first we write out the position in the *summary* for each key in the summary,
-        // then we write out (key, actual index position) pairs
-        Memory memory = Memory.allocate(offheapSize + (length * 4));
-        int idxPosition = 0;
-        int keyPosition = length * 4;
-        for (int i = 0; i < length; i++)
+        assert entries.length() > 0;
+
+        int count = (int) (offsets.length() / 4);
+        long entriesLength = entries.length();
+        if (boundary != null)
         {
-            // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the key
-            ByteBuffer keyBytes = keys.get(i).getKey();
-            memory.setBytes(keyPosition, keyBytes);
-            keyPosition += keyBytes.remaining();
-
-            // write the position in the actual index file
-            long actualIndexPosition = positions.get(i);
-            memory.setLong(keyPosition, actualIndexPosition);
-            keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+            count = boundary.summaryCount;
+            entriesLength = boundary.entriesLength;
         }
-        assert keyPosition == offheapSize + (length * 4);
+
         int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
-        return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval,
samplingLevel);
+        assert count > 0;
+        return new IndexSummary(partitioner, offsets.currentBuffer().sharedCopy(),
+                                count, entries.currentBuffer().sharedCopy(), entriesLength,
+                                sizeAtFullSampling, minIndexInterval, samplingLevel);
+    }
+
+    // close the builder and release any associated memory
+    public void close()
+    {
+        entries.close();
+        offsets.close();
     }
 
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -294,26 +281,25 @@ public class IndexSummaryBuilder
         int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
 
         // calculate new off-heap size
-        int removedKeyCount = 0;
-        long newOffHeapSize = existing.getOffHeapSize();
+        int newKeyCount = existing.size();
+        long newEntriesLength = existing.getEntriesLength();
         for (int start : startPoints)
         {
             for (int j = start; j < existing.size(); j += currentSamplingLevel)
             {
-                removedKeyCount++;
-                newOffHeapSize -= existing.getEntry(j).length;
+                newKeyCount--;
+                long length = existing.getEndInSummary(j) - existing.getPositionInSummary(j);
+                newEntriesLength -= length;
             }
         }
 
-        int newKeyCount = existing.size() - removedKeyCount;
-
-        // Subtract (removedKeyCount * 4) from the new size to account for fewer entries
in the first section, which
-        // stores the position of the actual entries in the summary.
-        RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount
* 4));
+        Memory oldEntries = existing.getEntries();
+        Memory newOffsets = Memory.allocate(newKeyCount * 4);
+        Memory newEntries = Memory.allocate(newEntriesLength);
 
         // Copy old entries to our new Memory.
-        int idxPosition = 0;
-        int keyPosition = newKeyCount * 4;
+        int i = 0;
+        int newEntriesOffset = 0;
         outer:
         for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
         {
@@ -326,15 +312,15 @@ public class IndexSummaryBuilder
             }
 
             // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the entry itself
-            byte[] entry = existing.getEntry(oldSummaryIndex);
-            memory.setBytes(keyPosition, entry, 0, entry.length);
-            keyPosition += entry.length;
+            newOffsets.setInt(i * 4, newEntriesOffset);
+            i++;
+            long start = existing.getPositionInSummary(oldSummaryIndex);
+            long length = existing.getEndInSummary(oldSummaryIndex) - start;
+            newEntries.put(newEntriesOffset, oldEntries, start, length);
+            newEntriesOffset += length;
         }
-        return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(),
-                                minIndexInterval, newSamplingLevel);
+        assert newEntriesOffset == newEntriesLength;
+        return new IndexSummary(partitioner, newOffsets, newKeyCount, newEntries, newEntriesLength,
+                                existing.getMaxNumberOfEntries(), minIndexInterval, newSamplingLevel);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 973b0c9..41e4adb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -736,40 +736,40 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             long indexSize = primaryIndex.length();
             long histogramCount = sstableMetadata.estimatedRowSize.count();
             long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
-                               ? histogramCount
-                               : estimateRowsFromIndex(primaryIndex); // statistics is supposed
to be optional
+                                 ? histogramCount
+                                 : estimateRowsFromIndex(primaryIndex); // statistics is
supposed to be optional
 
-            if (recreateBloomFilter)
-                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(),
true);
-
-            IndexSummaryBuilder summaryBuilder = null;
-            if (!summaryLoaded)
-                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(),
samplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys,
metadata.getMinIndexInterval(), samplingLevel))
             {
-                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex,
descriptor.version);
-                DecoratedKey decoratedKey = partitioner.decorateKey(key);
-                if (first == null)
-                    first = decoratedKey;
-                last = decoratedKey;
 
                 if (recreateBloomFilter)
-                    bf.add(decoratedKey.getKey());
+                    bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(),
true);
 
-                // if summary was already read from disk we don't want to re-populate it
using primary index
-                if (!summaryLoaded)
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
-                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                    ibuilder.addPotentialBoundary(indexPosition);
-                    dbuilder.addPotentialBoundary(indexEntry.position);
+                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                    RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex,
descriptor.version);
+                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
+                    if (first == null)
+                        first = decoratedKey;
+                    last = decoratedKey;
+
+                    if (recreateBloomFilter)
+                        bf.add(decoratedKey.getKey());
+
+                    // if summary was already read from disk we don't want to re-populate
it using primary index
+                    if (!summaryLoaded)
+                    {
+                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+                        ibuilder.addPotentialBoundary(indexPosition);
+                        dbuilder.addPotentialBoundary(indexEntry.position);
+                    }
                 }
-            }
 
-            if (!summaryLoaded)
-                indexSummary = summaryBuilder.build(partitioner);
+                if (!summaryLoaded)
+                    indexSummary = summaryBuilder.build(partitioner);
+            }
         }
         finally
         {
@@ -1004,16 +1004,17 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         try
         {
             long indexSize = primaryIndex.length();
-            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(),
metadata.getMinIndexInterval(), newSamplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(),
metadata.getMinIndexInterval(), newSamplingLevel))
             {
-                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)),
indexPosition);
-                RowIndexEntry.Serializer.skip(primaryIndex);
-            }
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+                {
+                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)),
indexPosition);
+                    RowIndexEntry.Serializer.skip(primaryIndex);
+                }
 
-            return summaryBuilder.build(partitioner);
+                return summaryBuilder.build(partitioner);
+            }
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b67685d..b35b652 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -403,7 +403,7 @@ public class SSTableWriter extends SSTable
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
-                                                           dfile, iwriter.summary.build(partitioner,
boundary.lastKey),
+                                                           dfile, iwriter.summary.build(partitioner,
boundary),
                                                            iwriter.bf.sharedCopy(), maxDataAge,
sstableMetadata, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file
has also been flushed)
@@ -470,6 +470,7 @@ public class SSTableWriter extends SSTable
         if (finishType.isFinal)
         {
             iwriter.bf.close();
+            iwriter.summary.close();
             // try to save the summaries to disk
             sstable.saveSummary(iwriter.builder, dbuilder);
             iwriter = null;
@@ -627,6 +628,7 @@ public class SSTableWriter extends SSTable
 
         public void abort()
         {
+            summary.close();
             indexFile.abort();
             bf.close();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
index 3e38293..8f4bed8 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -321,9 +321,9 @@ public abstract class AbstractDataOutput extends OutputStream implements
DataOut
         }
     }
 
-    public void write(Memory memory) throws IOException
+    public void write(Memory memory, long offset, long length) throws IOException
     {
-        for (ByteBuffer buffer : memory.asByteBuffers())
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
             write(buffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index 36c25ee..c2901e1 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -27,6 +27,5 @@ public interface DataOutputPlus extends DataOutput
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
-    void write(Memory memory) throws IOException;
-
+    void write(Memory memory, long offset, long length) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index ea78840..dcb9de6 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -25,6 +25,7 @@ import com.sun.jna.Native;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
@@ -77,6 +78,9 @@ public class Memory implements AutoCloseable
         if (bytes < 0)
             throw new IllegalArgumentException();
 
+        if (Ref.DEBUG_ENABLED)
+            return new SafeMemory(bytes);
+
         return new Memory(bytes);
     }
 
@@ -163,6 +167,33 @@ public class Memory implements AutoCloseable
         }
     }
 
+    public void setShort(long offset, short l)
+    {
+        checkBounds(offset, offset + 4);
+        if (unaligned)
+        {
+            unsafe.putShort(peer + offset, l);
+        }
+        else
+        {
+            putShortByByte(peer + offset, l);
+        }
+    }
+
+    private void putShortByByte(long address, short value)
+    {
+        if (bigEndian)
+        {
+            unsafe.putByte(address, (byte) (value >> 8));
+            unsafe.putByte(address + 1, (byte) (value));
+        }
+        else
+        {
+            unsafe.putByte(address + 1, (byte) (value >> 8));
+            unsafe.putByte(address, (byte) (value));
+        }
+    }
+
     public void setBytes(long memoryOffset, ByteBuffer buffer)
     {
         if (buffer == null)
@@ -340,20 +371,20 @@ public class Memory implements AutoCloseable
         return false;
     }
 
-    public ByteBuffer[] asByteBuffers()
+    public ByteBuffer[] asByteBuffers(long offset, long length)
     {
         if (size() == 0)
             return new ByteBuffer[0];
 
-        ByteBuffer[] result = new ByteBuffer[(int) (size() / Integer.MAX_VALUE) + 1];
-        long offset = 0;
+        ByteBuffer[] result = new ByteBuffer[(int) (length / Integer.MAX_VALUE) + 1];
         int size = (int) (size() / result.length);
         for (int i = 0 ; i < result.length - 1 ; i++)
         {
             result[i] = MemoryUtil.getByteBuffer(peer + offset, size);
             offset += size;
+            length -= size;
         }
-        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size()
- offset));
+        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) length);
         return result;
     }
 
@@ -366,5 +397,4 @@ public class Memory implements AutoCloseable
     {
         return String.format("Memory@[%x..%x)", peer, peer + size);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
new file mode 100644
index 0000000..1998cc6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+{
+    private ByteOrder order = ByteOrder.BIG_ENDIAN;
+    private SafeMemory buffer;
+    private long length;
+
+    public SafeMemoryWriter(long initialCapacity)
+    {
+        buffer = new SafeMemory(initialCapacity);
+    }
+
+    public void write(byte[] buffer, int offset, int count)
+    {
+        long newLength = ensureCapacity(count);
+        this.buffer.setBytes(this.length, buffer, offset, count);
+        this.length = newLength;
+    }
+
+    public void write(int oneByte)
+    {
+        long newLength = ensureCapacity(1);
+        buffer.setByte(length++, (byte) oneByte);
+        length = newLength;
+    }
+
+    public void writeShort(int val) throws IOException
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Short.reverseBytes((short) val);
+        long newLength = ensureCapacity(2);
+        buffer.setShort(length, (short) val);
+        length = newLength;
+    }
+
+    public void writeInt(int val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Integer.reverseBytes(val);
+        long newLength = ensureCapacity(4);
+        buffer.setInt(length, val);
+        length = newLength;
+    }
+
+    public void writeLong(long val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Long.reverseBytes(val);
+        long newLength = ensureCapacity(8);
+        buffer.setLong(length, val);
+        length = newLength;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        long newLength = ensureCapacity(buffer.remaining());
+        this.buffer.setBytes(length, buffer);
+        length = newLength;
+    }
+
+    public void write(Memory memory)
+    {
+        long newLength = ensureCapacity(memory.size());
+        buffer.put(length, memory, 0, memory.size());
+        length = newLength;
+    }
+
+    private long ensureCapacity(long size)
+    {
+        long newLength = this.length + size;
+        if (newLength > buffer.size())
+            setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
+        return newLength;
+    }
+
+    public SafeMemory currentBuffer()
+    {
+        return buffer;
+    }
+
+    public void setCapacity(long newCapacity)
+    {
+        if (newCapacity != capacity())
+        {
+            SafeMemory oldBuffer = buffer;
+            buffer = this.buffer.copy(newCapacity);
+            oldBuffer.free();
+        }
+    }
+
+    public void close()
+    {
+        buffer.close();
+    }
+
+    public long length()
+    {
+        return length;
+    }
+
+    public long capacity()
+    {
+        return buffer.size();
+    }
+
+    // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy
with this gracefully
+    // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+    public SafeMemoryWriter withByteOrder(ByteOrder order)
+    {
+        this.order = order;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 8213c46..4e6cef7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 public final class Ref<T> implements RefCounted<T>, AutoCloseable
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
-    static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+    public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount",
"false").equalsIgnoreCase("true");
 
     final State state;
     final T referent;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index c656f28..96e226c 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -18,26 +18,34 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.Arrays;
+
 /**
  * An implementation of SharedCloseable that wraps a normal AutoCloseable,
  * ensuring its close method is only called when all instances of SharedCloseable have been
  */
 public abstract class WrappedSharedCloseable extends SharedCloseableImpl
 {
-    final AutoCloseable wrapped;
+    final AutoCloseable[] wrapped;
 
     public WrappedSharedCloseable(final AutoCloseable closeable)
     {
+        this(new AutoCloseable[] { closeable});
+    }
+
+    public WrappedSharedCloseable(final AutoCloseable[] closeable)
+    {
         super(new RefCounted.Tidy()
         {
             public void tidy() throws Exception
             {
-                closeable.close();
+                for (AutoCloseable c : closeable)
+                    c.close();
             }
 
             public String name()
             {
-                return closeable.toString();
+                return Arrays.toString(closeable);
             }
         });
         wrapped = closeable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 9aca66d..9c709a3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -91,38 +91,42 @@ public class IndexSummaryTest
     public void testAddEmptyKey() throws Exception
     {
         IPartitioner p = new RandomPartitioner();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
-        builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
-        IndexSummary summary = builder.build(p);
-        assertEquals(1, summary.size());
-        assertEquals(0, summary.getPosition(0));
-        assertArrayEquals(new byte[0], summary.getKey(0));
-
-        DataOutputBuffer dos = new DataOutputBuffer();
-        IndexSummary.serializer.serialize(summary, dos, false);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
-
-        assertEquals(1, loaded.size());
-        assertEquals(summary.getPosition(0), loaded.getPosition(0));
-        assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL))
+        {
+            builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
+            IndexSummary summary = builder.build(p);
+            assertEquals(1, summary.size());
+            assertEquals(0, summary.getPosition(0));
+            assertArrayEquals(new byte[0], summary.getKey(0));
+
+            DataOutputBuffer dos = new DataOutputBuffer();
+            IndexSummary.serializer.serialize(summary, dos, false);
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
+            IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
+
+            assertEquals(1, loaded.size());
+            assertEquals(summary.getPosition(0), loaded.getPosition(0));
+            assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        }
     }
 
     private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size,
int interval)
     {
         List<DecoratedKey> list = Lists.newArrayList();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
-        for (int i = 0; i < size; i++)
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval,
BASE_SAMPLING_LEVEL))
         {
-            UUID uuid = UUID.randomUUID();
-            DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
-            list.add(key);
+            for (int i = 0; i < size; i++)
+            {
+                UUID uuid = UUID.randomUUID();
+                DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
+                list.add(key);
+            }
+            Collections.sort(list);
+            for (int i = 0; i < size; i++)
+                builder.maybeAddEntry(list.get(i), i);
+            IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
+            return Pair.create(list, summary);
         }
-        Collections.sort(list);
-        for (int i = 0; i < size; i++)
-            builder.maybeAddEntry(list.get(i), i);
-        IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
-        return Pair.create(list, summary);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 76f3304..7110d1d 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -92,6 +92,17 @@ public class DataOutputTest
     }
 
     @Test
+    public void testSafeMemoryWriter() throws IOException
+    {
+        SafeMemoryWriter write = new SafeMemoryWriter(10);
+        DataInput canon = testWrite(write);
+        byte[] bytes = new byte[345];
+        write.currentBuffer().getBytes(0, bytes, 0, 345);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
+        testRead(test, canon);
+    }
+
+    @Test
     public void testFileOutputStream() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");


Mime
View raw message