Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF512F2F6 for ; Sun, 28 Apr 2013 14:50:15 +0000 (UTC) Received: (qmail 10200 invoked by uid 500); 28 Apr 2013 14:50:15 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 10170 invoked by uid 500); 28 Apr 2013 14:50:15 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 10155 invoked by uid 99); 28 Apr 2013 14:50:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 28 Apr 2013 14:50:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 099968830D2; Sun, 28 Apr 2013 14:50:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Sun, 28 Apr 2013 14:50:14 -0000 Message-Id: <387cfe489d7a41faac917767f25d65a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: Reduce memory consumption of IndexSummary patch by jbellis; reviewed by vijay for CASSANDRA-5506 Updated Branches: refs/heads/trunk 1aa987402 -> 4439b4117 Reduce memory consumption of IndexSummary patch by jbellis; reviewed by vijay for CASSANDRA-5506 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9851b73f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9851b73f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9851b73f Branch: refs/heads/trunk Commit: 9851b73fc3bbaf61ec80b54c64483bcf56aaa396 Parents: dc5b1e9 Author: Jonathan Ellis Authored: Sun Apr 28 09:10:38 2013 -0500 Committer: Jonathan Ellis Committed: Sun Apr 28 09:10:38 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/DataTracker.java | 1 - .../db/compaction/AbstractCompactionStrategy.java | 2 +- .../apache/cassandra/io/sstable/IndexSummary.java | 115 ++++++++------- .../cassandra/io/sstable/IndexSummaryBuilder.java | 76 ++++++++++ .../apache/cassandra/io/sstable/SSTableReader.java | 43 +++--- .../apache/cassandra/io/sstable/SSTableWriter.java | 11 +- .../org/apache/cassandra/utils/ByteBufferUtil.java | 8 + .../cassandra/io/sstable/SSTableReaderTest.java | 4 +- 9 files changed, 170 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7634742..c843e5e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.5 + * reduce memory consumption of IndexSummary (CASSANDRA-5506) * remove per-row column name bloom filters (CASSANDRA-5492) * Include fatal errors in trace events (CASSANDRA-5447) * Ensure that PerRowSecondaryIndex is notified of row-level deletes http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index adbe037..7f6e94c 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -329,7 +329,6 @@ public class DataTracker { for (SSTableReader sstable : newSSTables) { - assert sstable.getKeySamples() != null; if (logger.isDebugEnabled()) logger.debug(String.format("adding %s to list of files tracked for %s.%s", sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index cb15109..a588216 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -190,7 +190,7 @@ public abstract class AbstractCompactionStrategy else { // what percentage of columns do we expect to compact outside of overlap? - if (sstable.getKeySamples().size() < 2) + if (sstable.getKeySamples().length < 2) { // we have too few samples to estimate correct percentage return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index 1b9291b..3213d20 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -21,109 +21,110 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.ByteBufferUtil; -/** - * Two approaches to building an IndexSummary: - * 1. Call maybeAddEntry with every potential index entry - * 2. Call shouldAddEntry, [addEntry,] incrementRowid - */ public class IndexSummary { public static final IndexSummarySerializer serializer = new IndexSummarySerializer(); - private final ArrayList positions; - private final ArrayList keys; - private long keysWritten = 0; - public IndexSummary(long expectedKeys) - { - long expectedEntries = expectedKeys / DatabaseDescriptor.getIndexInterval(); - if (expectedEntries > Integer.MAX_VALUE) - // TODO: that's a _lot_ of keys, or a very low interval - throw new RuntimeException("Cannot use index_interval of " + DatabaseDescriptor.getIndexInterval() + " with " + expectedKeys + " (expected) keys."); - positions = new ArrayList((int)expectedEntries); - keys = new ArrayList((int)expectedEntries); - } + private final long[] positions; + private final byte[][] keys; + private final IPartitioner partitioner; - private IndexSummary() + public IndexSummary(IPartitioner partitioner, byte[][] keys, long[] positions) { - positions = new ArrayList(); - keys = new ArrayList(); - } + this.partitioner = partitioner; + assert keys != null && keys.length > 0; + assert keys.length == positions.length; - public void incrementRowid() - { - keysWritten++; + this.keys = keys; + this.positions = positions; } - public boolean shouldAddEntry() + public byte[][] getKeys() { - return keysWritten % DatabaseDescriptor.getIndexInterval() == 0; + return keys; } - public void addEntry(DecoratedKey key, long indexPosition) + // binary search is notoriously more difficult to get right than it looks; this is lifted from + // Harmony's Collections implementation + public int binarySearch(RowPosition key) { - keys.add(SSTable.getMinimalKey(key)); - positions.add(indexPosition); - } + int low = 0, mid = keys.length, high = mid - 1, result = -1; - public void maybeAddEntry(DecoratedKey decoratedKey, long indexPosition) - { - if (shouldAddEntry()) - addEntry(decoratedKey, indexPosition); - incrementRowid(); + while (low <= high) + { + mid = (low + high) >> 1; + result = -partitioner.decorateKey(ByteBuffer.wrap(keys[mid])).compareTo(key); + + if (result > 0) + { + low = mid + 1; + } + else if (result == 0) + { + return mid; + } + else + { + high = mid - 1; + } + } + + return -mid - (result < 0 ? 1 : 2); } - public List getKeys() + public byte[] getKey(int index) { - return keys; + return keys[index]; } public long getPosition(int index) { - return positions.get(index); + return positions[index]; } - public void complete() + public int size() { - keys.trimToSize(); - positions.trimToSize(); + return positions.length; } public static class IndexSummarySerializer { - public void serialize(IndexSummary t, DataOutput dos) throws IOException + public void serialize(IndexSummary t, DataOutput out) throws IOException { - assert t.keys.size() == t.positions.size() : "keysize and the position sizes are not same."; - dos.writeInt(DatabaseDescriptor.getIndexInterval()); - dos.writeInt(t.keys.size()); - for (int i = 0; i < t.keys.size(); i++) + out.writeInt(DatabaseDescriptor.getIndexInterval()); + out.writeInt(t.keys.length); + for (int i = 0; i < t.keys.length; i++) { - dos.writeLong(t.positions.get(i)); - ByteBufferUtil.writeWithLength(t.keys.get(i).key, dos); + out.writeLong(t.getPosition(i)); + ByteBufferUtil.writeWithLength(t.keys[i], out); } } - public IndexSummary deserialize(DataInput dis, IPartitioner partitioner) throws IOException + public IndexSummary deserialize(DataInput in, IPartitioner partitioner) throws IOException { - IndexSummary summary = new IndexSummary(); - if (dis.readInt() != DatabaseDescriptor.getIndexInterval()) + if (in.readInt() != DatabaseDescriptor.getIndexInterval()) throw new IOException("Cannot read the saved summary because Index Interval changed."); - int size = dis.readInt(); + int size = in.readInt(); + long[] positions = new long[size]; + byte[][] keys = new byte[size][]; + for (int i = 0; i < size; i++) { - long location = dis.readLong(); - ByteBuffer key = ByteBufferUtil.readWithLength(dis); - summary.addEntry(partitioner.decorateKey(key), location); + positions[i] = in.readLong(); + keys[i] = ByteBufferUtil.readBytes(in, in.readInt()); } - return summary; + + return new IndexSummary(partitioner, keys, positions); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java new file mode 100644 index 0000000..80b0d3a --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.util.ArrayList; + +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class IndexSummaryBuilder +{ + private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class); + + private final ArrayList positions; + private final ArrayList keys; + private long keysWritten = 0; + + public IndexSummaryBuilder(long expectedKeys) + { + long expectedEntries = expectedKeys / DatabaseDescriptor.getIndexInterval(); + if (expectedEntries > Integer.MAX_VALUE) + { + // that's a _lot_ of keys, and a very low interval + int effectiveInterval = (int) Math.ceil((double) Integer.MAX_VALUE / expectedKeys); + expectedEntries = expectedKeys / effectiveInterval; + assert expectedEntries <= Integer.MAX_VALUE : expectedEntries; + logger.warn("Index interval of {} is too low for {} expected keys; using interval of {} instead", + DatabaseDescriptor.getIndexInterval(), expectedKeys, effectiveInterval); + } + positions = new ArrayList((int)expectedEntries); + keys = new ArrayList((int)expectedEntries); + } + + public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition) + { + if (keysWritten % DatabaseDescriptor.getIndexInterval() == 0) + { + keys.add(ByteBufferUtil.getArray(decoratedKey.key)); + positions.add(indexPosition); + } + keysWritten++; + + return this; + } + + public IndexSummary build(IPartitioner partitioner) + { + byte[][] keysArray = new byte[keys.size()][]; + for (int i = 0; i < keys.size(); i++) + keysArray[i] = keys.get(i); + + return new IndexSummary(partitioner, keysArray, Longs.toArray(positions)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 21a8673..e4a2fe1 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -106,7 +106,7 @@ public class SSTableReader extends SSTable for (SSTableReader sstable : sstables) { - int indexKeyCount = sstable.getKeySamples().size(); + int indexKeyCount = sstable.getKeySamples().length; count = count + (indexKeyCount + 1) * DatabaseDescriptor.getIndexInterval(); if (logger.isDebugEnabled()) logger.debug("index size for bloom filter calc for file : " + sstable.getFilename() + " : " + count); @@ -366,8 +366,9 @@ public class SSTableReader extends SSTable if (recreatebloom) bf = LegacyBloomFilter.getFilter(estimatedKeys, 15); + IndexSummaryBuilder summaryBuilder = null; if (!summaryLoaded) - indexSummary = new IndexSummary(estimatedKeys); + summaryBuilder = new IndexSummaryBuilder(estimatedKeys); long indexPosition; while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) @@ -385,20 +386,23 @@ public class SSTableReader extends SSTable // if summary was already read from disk we don't want to re-populate it using primary index if (!summaryLoaded) { - indexSummary.maybeAddEntry(decoratedKey, indexPosition); + summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); ibuilder.addPotentialBoundary(indexPosition); dbuilder.addPotentialBoundary(indexEntry.position); } } + + if (!summaryLoaded) + indexSummary = summaryBuilder.build(partitioner); } finally { FileUtils.closeQuietly(primaryIndex); } + first = getMinimalKey(first); last = getMinimalKey(last); // finalize the load. - indexSummary.complete(); // finalize the state of the reader ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); @@ -478,8 +482,7 @@ public class SSTableReader extends SSTable /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ public long getIndexScanPosition(RowPosition key) { - assert indexSummary.getKeys() != null && indexSummary.getKeys().size() > 0; - int index = Collections.binarySearch(indexSummary.getKeys(), key); + int index = indexSummary.binarySearch(key); if (index < 0) { // binary search gives us the first index _greater_ than the key searched for, @@ -530,7 +533,7 @@ public class SSTableReader extends SSTable */ public long estimatedKeys() { - return indexSummary.getKeys().size() * DatabaseDescriptor.getIndexInterval(); + return indexSummary.size() * DatabaseDescriptor.getIndexInterval(); } /** @@ -540,7 +543,7 @@ public class SSTableReader extends SSTable public long estimatedKeysForRanges(Collection> ranges) { long sampleKeyCount = 0; - List> sampleIndexes = getSampleIndexesForRanges(indexSummary.getKeys(), ranges); + List> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges); for (Pair sampleIndexRange : sampleIndexes) sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1); return Math.max(1, sampleKeyCount * DatabaseDescriptor.getIndexInterval()); @@ -549,36 +552,34 @@ public class SSTableReader extends SSTable /** * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable. */ - public Collection getKeySamples() + public byte[][] getKeySamples() { return indexSummary.getKeys(); } - private static List> getSampleIndexesForRanges(List samples, Collection> ranges) + private static List> getSampleIndexesForRanges(IndexSummary summary, Collection> ranges) { // use the index to determine a minimal section for each range List> positions = new ArrayList>(); - if (samples.isEmpty()) - return positions; for (Range range : Range.normalize(ranges)) { RowPosition leftPosition = range.left.maxKeyBound(); RowPosition rightPosition = range.right.maxKeyBound(); - int left = Collections.binarySearch(samples, leftPosition); + int left = summary.binarySearch(leftPosition); if (left < 0) left = (left + 1) * -1; else // left range are start exclusive left = left + 1; - if (left == samples.size()) + if (left == summary.size()) // left is past the end of the sampling continue; int right = Range.isWrapAround(range.left, range.right) - ? samples.size() - 1 - : Collections.binarySearch(samples, rightPosition); + ? summary.size() - 1 + : summary.binarySearch(rightPosition); if (right < 0) { // range are end inclusive so we use the previous index from what binarySearch give us @@ -600,9 +601,7 @@ public class SSTableReader extends SSTable public Iterable getKeySamples(final Range range) { - final List samples = indexSummary.getKeys(); - - final List> indexRanges = getSampleIndexesForRanges(samples, Collections.singletonList(range)); + final List> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range)); if (indexRanges.isEmpty()) return Collections.emptyList(); @@ -635,10 +634,8 @@ public class SSTableReader extends SSTable public DecoratedKey next() { - RowPosition k = samples.get(idx++); - // the index should only contain valid row key, we only allow RowPosition in KeyPosition for search purposes - assert k instanceof DecoratedKey; - return (DecoratedKey)k; + byte[] bytes = indexSummary.getKey(idx++); + return partitioner.decorateKey(ByteBuffer.wrap(bytes)); } public void remove() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index d798623..51aeaa6 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -104,7 +104,7 @@ public class SSTableWriter extends SSTable { dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); dataFile = SequentialWriter.open(new File(getFilename()), - !metadata.populateIoCacheOnFlush()); + !metadata.populateIoCacheOnFlush()); dataFile.setComputeDigest(); } @@ -346,7 +346,7 @@ public class SSTableWriter extends SSTable partitioner, ifile, dfile, - iwriter.summary, + iwriter.summary.build(partitioner), iwriter.bf, maxDataAge, sstableMetadata); @@ -433,7 +433,7 @@ public class SSTableWriter extends SSTable { private final SequentialWriter indexFile; public final SegmentedFile.Builder builder; - public final IndexSummary summary; + public final IndexSummaryBuilder summary; public final IFilter bf; private FileMark mark; @@ -442,7 +442,7 @@ public class SSTableWriter extends SSTable indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)), !metadata.populateIoCacheOnFlush()); builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - summary = new IndexSummary(keyCount); + summary = new IndexSummaryBuilder(keyCount); bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); } @@ -495,9 +495,6 @@ public class SSTableWriter extends SSTable long position = indexFile.getFilePointer(); indexFile.close(); // calls force FileUtils.truncate(indexFile.getPath(), position); - - // finalize in-memory index state - summary.complete(); } public void mark() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 0b27d03..ae63bf7 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -396,6 +396,14 @@ public class ByteBufferUtil return ByteBuffer.wrap(buff); } + public static byte[] readBytes(DataInput in, int length) throws IOException + { + assert length > 0; + byte[] bytes = new byte[length]; + in.readFully(bytes); + return bytes; + } + /** * Convert a byte buffer to an integer. * Does not change the byte buffer position. http://git-wip-us.apache.org/repos/asf/cassandra/blob/9851b73f/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 64e70f7..d0670a0 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -284,8 +284,8 @@ public class SSTableReaderTest extends SchemaLoader // test to see if sstable can be opened as expected SSTableReader target = SSTableReader.open(desc); - Collection keySamples = target.getKeySamples(); - assert keySamples.size() == 1 && keySamples.iterator().next().equals(firstKey); + byte[][] keySamples = target.getKeySamples(); + assert keySamples.length == 1 && Arrays.equals(keySamples[0], firstKey.key.array()); assert target.first.equals(firstKey); assert target.last.equals(lastKey); }