Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AED051033A for ; Fri, 22 Nov 2013 18:13:12 +0000 (UTC) Received: (qmail 73658 invoked by uid 500); 22 Nov 2013 18:13:10 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 73628 invoked by uid 500); 22 Nov 2013 18:13:09 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 73611 invoked by uid 99); 22 Nov 2013 18:13:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Nov 2013 18:13:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B384A9022E4; Fri, 22 Nov 2013 18:13:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Fri, 22 Nov 2013 18:13:07 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] allocate fixed index summary memory pool and resample cold index summaries to use less memory patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5519 Updated Branches: refs/heads/trunk 40598efa6 -> dbd1a727b http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java new file mode 100644 index 0000000..aac70ec --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; +import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL; +import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD; +import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD; +import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class IndexSummaryManagerTest extends SchemaLoader +{ + private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManagerTest.class); + + + private static long totalOffHeapSize(List sstables) + { + long total = 0; + for (SSTableReader sstable : sstables) + total += sstable.getIndexSummaryOffHeapSize(); + + return total; + } + + private static List resetSummaries(List sstables, long originalOffHeapSize) throws IOException + { + for (SSTableReader sstable : sstables) + sstable.readMeter = new RestorableMeter(100.0, 100.0); + + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size()); + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); + + return sstables; + } + + private void validateData(ColumnFamilyStore cfs, int numRows) + { + for (int i = 0; i < numRows; i++) + { + DecoratedKey key = Util.dk(String.valueOf(i)); + QueryFilter filter = QueryFilter.getIdentityFilter(key, cfs.getColumnFamilyName(), System.currentTimeMillis()); + ColumnFamily row = cfs.getColumnFamily(filter); + assertNotNull(row); + Column column = row.getColumn(ByteBufferUtil.bytes("column")); + assertNotNull(column); + assertEquals(100, column.value().array().length); + } + } + + private Comparator hotnessComparator = new Comparator() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate()); + } + }; + + @Test + public void testRedistributeSummaries() throws IOException + { + String ksname = "Keyspace1"; + String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + int numSSTables = 4; + int numRows = 256; + for (int sstable = 0; sstable < numSSTables; sstable++) + { + for (int row = 0; row < numRows; row++) + { + DecoratedKey key = Util.dk(String.valueOf(row)); + RowMutation rm = new RowMutation(ksname, key.key); + rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0); + rm.apply(); + } + cfs.forceBlockingFlush(); + } + + List sstables = new ArrayList<>(cfs.getSSTables()); + assertEquals(numSSTables, sstables.size()); + validateData(cfs, numRows); + + for (SSTableReader sstable : sstables) + sstable.readMeter = new RestorableMeter(100.0, 100.0); + + long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); + + // there should be enough space to not downsample anything + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables)); + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); + assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables)); + validateData(cfs, numRows); + + // everything should get cut in half + assert sstables.size() == 4; + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2))); + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // everything should get cut to a quarter + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4))); + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // upsample back up to half + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2))); + assert sstables.size() == 4; + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // upsample back up to the original index summary + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables)); + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // make two of the four sstables cold, only leave enough space for three full index summaries, + // so the two cold sstables should get downsampled to be half of their original size + sstables.get(0).readMeter = new RestorableMeter(50.0, 50.0); + sstables.get(1).readMeter = new RestorableMeter(50.0, 50.0); + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3)); + Collections.sort(sstables, hotnessComparator); + assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // small increases or decreases in the read rate don't result in downsampling or upsampling + double lowerRate = 50.0 * (DOWNSAMPLE_THESHOLD + (DOWNSAMPLE_THESHOLD * 0.10)); + double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10)); + sstables.get(0).readMeter = new RestorableMeter(lowerRate, lowerRate); + sstables.get(1).readMeter = new RestorableMeter(higherRate, higherRate); + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3)); + Collections.sort(sstables, hotnessComparator); + assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled + sstables = resetSummaries(sstables, singleSummaryOffHeapSpace); + sstables.get(0).readMeter = new RestorableMeter(1.0, 1.0); + sstables.get(1).readMeter = new RestorableMeter(2.0, 2.0); + sstables.get(2).readMeter = new RestorableMeter(1000.0, 1000.0); + sstables.get(3).readMeter = new RestorableMeter(1000.0, 1000.0); + + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50); + Collections.sort(sstables, hotnessComparator); + + if (sstables.get(0).getIndexSummarySamplingLevel() == MIN_SAMPLING_LEVEL) + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel()); + else + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel()); + + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel()); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + + // Cause a mix of upsampling and downsampling. We'll leave enough space for two full index summaries. The two + // coldest sstables will get downsampled to 8/128 of their size, leaving us with 1 and 112/128th index + // summaries worth of space. The hottest sstable should get a full index summary, and the one in the middle + // should get the remainder. + sstables.get(0).readMeter = new RestorableMeter(0.0, 0.0); + sstables.get(1).readMeter = new RestorableMeter(0.0, 0.0); + sstables.get(2).readMeter = new RestorableMeter(100, 100); + sstables.get(3).readMeter = new RestorableMeter(128.0, 128.0); + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (100.0 / BASE_SAMPLING_LEVEL)))); + Collections.sort(sstables, hotnessComparator); + assertEquals(MIN_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel()); + assertEquals(MIN_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel()); + assertTrue(sstables.get(2).getIndexSummarySamplingLevel() > MIN_SAMPLING_LEVEL); + assertTrue(sstables.get(2).getIndexSummarySamplingLevel() < BASE_SAMPLING_LEVEL); + assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + + // Don't leave enough space for even the minimal index summaries + sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 100); + for (SSTableReader sstable : sstables) + assertEquals(MIN_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); + validateData(cfs, numRows); + } + + @Test + public void testRebuildAtSamplingLevel() throws IOException + { + String ksname = "Keyspace1"; + String cfname = "StandardLowIndexInterval"; + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + int numRows = 256; + for (int row = 0; row < numRows; row++) + { + DecoratedKey key = Util.dk(String.valueOf(row)); + RowMutation rm = new RowMutation(ksname, key.key); + rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0); + rm.apply(); + } + cfs.forceBlockingFlush(); + + List sstables = new ArrayList<>(cfs.getSSTables()); + assertEquals(1, sstables.size()); + SSTableReader sstable = sstables.get(0); + + for (int samplingLevel = MIN_SAMPLING_LEVEL; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++) + { + sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel); + assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel()); + int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getIndexInterval() * BASE_SAMPLING_LEVEL); + assertEquals(expectedSize, sstable.getIndexSummarySize(), 1); + } + } + + @Test + public void testJMXFunctions() throws IOException + { + IndexSummaryManager manager = IndexSummaryManager.instance; + + // resize interval + assertNotNull(manager.getResizeIntervalInMinutes()); + manager.setResizeIntervalInMinutes(-1); + assertNull(manager.getTimeToNextResize(TimeUnit.MINUTES)); + + manager.setResizeIntervalInMinutes(10); + assertEquals(10, manager.getResizeIntervalInMinutes()); + assertEquals(10, manager.getTimeToNextResize(TimeUnit.MINUTES), 1); + manager.setResizeIntervalInMinutes(15); + assertEquals(15, manager.getResizeIntervalInMinutes()); + assertEquals(15, manager.getTimeToNextResize(TimeUnit.MINUTES), 2); + + // memory pool capacity + assertTrue(manager.getMemoryPoolCapacityInMB() >= 0); + manager.setMemoryPoolCapacityInMB(10); + assertEquals(10, manager.getMemoryPoolCapacityInMB()); + + String ksname = "Keyspace1"; + String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + int numSSTables = 2; + int numRows = 10; + for (int sstable = 0; sstable < numSSTables; sstable++) + { + for (int row = 0; row < numRows; row++) + { + DecoratedKey key = Util.dk(String.valueOf(row)); + RowMutation rm = new RowMutation(ksname, key.key); + rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0); + rm.apply(); + } + cfs.forceBlockingFlush(); + } + + assertEquals(1.0, manager.getAverageSamplingRatio(), 0.001); + Map samplingRatios = manager.getSamplingRatios(); + for (Map.Entry entry : samplingRatios.entrySet()) + assertEquals(1.0, entry.getValue(), 0.001); + + manager.setMemoryPoolCapacityInMB(0); + manager.redistributeSummaries(); + assertTrue(manager.getAverageSamplingRatio() < 0.99); + samplingRatios = manager.getSamplingRatios(); + for (Map.Entry entry : samplingRatios.entrySet()) + { + if (entry.getKey().contains("StandardLowIndexInterval")) + assertTrue(entry.getValue() < 0.9); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java index 8e73161..8d013f9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java @@ -23,22 +23,25 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import com.google.common.collect.Lists; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; -import static org.junit.Assert.assertArrayEquals; +import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.downsample; +import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.entriesAtSamplingLevel; +import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; +import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL; + +import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; public class IndexSummaryTest @@ -73,13 +76,13 @@ public class IndexSummaryTest Pair, IndexSummary> random = generateRandomIndex(100, 1); ByteArrayOutputStream aos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(aos); - IndexSummary.serializer.serialize(random.right, dos); + IndexSummary.serializer.serialize(random.right, dos, false); // write junk dos.writeUTF("JUNK"); dos.writeUTF("JUNK"); FileUtils.closeQuietly(dos); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray())); - IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner()); + IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false); for (int i = 0; i < 100; i++) assertEquals(i, is.binarySearch(random.left.get(i))); // read the junk @@ -92,7 +95,7 @@ public class IndexSummaryTest public void testAddEmptyKey() throws Exception { IPartitioner p = new RandomPartitioner(); - IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1); + IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL); builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0); IndexSummary summary = builder.build(p); assertEquals(1, summary.size()); @@ -101,9 +104,9 @@ public class IndexSummaryTest ByteArrayOutputStream aos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(aos); - IndexSummary.serializer.serialize(summary, dos); + IndexSummary.serializer.serialize(summary, dos, false); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray())); - IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p); + IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false); assertEquals(1, loaded.size()); assertEquals(summary.getPosition(0), loaded.getPosition(0)); @@ -113,7 +116,7 @@ public class IndexSummaryTest private Pair, IndexSummary> generateRandomIndex(int size, int interval) { List list = Lists.newArrayList(); - IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval); + IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL); for (int i = 0; i < size; i++) { UUID uuid = UUID.randomUUID(); @@ -126,4 +129,128 @@ public class IndexSummaryTest IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner()); return Pair.create(list, summary); } -} + + @Test + public void testDownsamplePatterns() + { + assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(0)); + assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(1)); + + assertEquals(Arrays.asList(0, 1), Downsampling.getSamplingPattern(2)); + assertEquals(Arrays.asList(0, 2, 1, 3), Downsampling.getSamplingPattern(4)); + assertEquals(Arrays.asList(0, 4, 2, 6, 1, 5, 3, 7), Downsampling.getSamplingPattern(8)); + assertEquals(Arrays.asList(0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15), Downsampling.getSamplingPattern(16)); + } + + private static boolean shouldSkip(int index, List startPoints) + { + for (int start : startPoints) + { + if ((index - start) % BASE_SAMPLING_LEVEL == 0) + return true; + } + return false; + } + + @Test + public void testDownsample() + { + final int NUM_KEYS = 4096; + final int INDEX_INTERVAL = 128; + final int ORIGINAL_NUM_ENTRIES = NUM_KEYS / INDEX_INTERVAL; + + + Pair, IndexSummary> random = generateRandomIndex(NUM_KEYS, INDEX_INTERVAL); + List keys = random.left; + IndexSummary original = random.right; + + // sanity check on the original index summary + for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++) + assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(original.getKey(i))); + + List samplePattern = Downsampling.getSamplingPattern(BASE_SAMPLING_LEVEL); + + // downsample by one level, then two levels, then three levels... + int downsamplingRound = 1; + for (int samplingLevel = BASE_SAMPLING_LEVEL - 1; samplingLevel >= MIN_SAMPLING_LEVEL; samplingLevel--) + { + IndexSummary downsampled = downsample(original, samplingLevel, DatabaseDescriptor.getPartitioner()); + assertEquals(entriesAtSamplingLevel(samplingLevel, original.getMaxNumberOfEntries()), downsampled.size()); + + int sampledCount = 0; + List skipStartPoints = samplePattern.subList(0, downsamplingRound); + for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++) + { + if (!shouldSkip(i, skipStartPoints)) + { + assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount))); + sampledCount++; + } + } + downsamplingRound++; + } + + // downsample one level each time + IndexSummary previous = original; + downsamplingRound = 1; + for (int downsampleLevel = BASE_SAMPLING_LEVEL - 1; downsampleLevel >= MIN_SAMPLING_LEVEL; downsampleLevel--) + { + IndexSummary downsampled = downsample(previous, downsampleLevel, DatabaseDescriptor.getPartitioner()); + assertEquals(entriesAtSamplingLevel(downsampleLevel, original.getMaxNumberOfEntries()), downsampled.size()); + + int sampledCount = 0; + List skipStartPoints = samplePattern.subList(0, downsamplingRound); + for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++) + { + if (!shouldSkip(i, skipStartPoints)) + { + assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount))); + sampledCount++; + } + } + + previous = downsampled; + downsamplingRound++; + } + } + + @Test + public void testOriginalIndexLookup() + { + for (int i = BASE_SAMPLING_LEVEL; i >= MIN_SAMPLING_LEVEL; i--) + assertEquals(i, Downsampling.getOriginalIndexes(i).size()); + + ArrayList full = new ArrayList<>(); + for (int i = 0; i < BASE_SAMPLING_LEVEL; i++) + full.add(i); + + assertEquals(full, Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL)); + // the entry at index 0 is the first to go + assertEquals(full.subList(1, full.size()), Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL - 1)); + + // spot check a few values (these depend on BASE_SAMPLING_LEVEL being 128) + assert BASE_SAMPLING_LEVEL == 128; + assertEquals(Arrays.asList(31, 63, 95, 127), Downsampling.getOriginalIndexes(4)); + assertEquals(Arrays.asList(63, 127), Downsampling.getOriginalIndexes(2)); + assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0)); + } + + @Test + public void testGetNumberOfSkippedEntriesAfterIndex() + { + int indexInterval = 128; + for (int i = 0; i < BASE_SAMPLING_LEVEL; i++) + assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL, indexInterval)); + + // with one round of downsampling, only the first summary has been removed, so only the last index will have + // double the gap until the next sample + for (int i = 0; i < BASE_SAMPLING_LEVEL - 2; i++) + assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL - 1, indexInterval)); + assertEquals(indexInterval * 2, Downsampling.getEffectiveIndexIntervalAfterIndex(BASE_SAMPLING_LEVEL - 2, BASE_SAMPLING_LEVEL - 1, indexInterval)); + + // at samplingLevel=2, the retained summary points are [63, 127] (assumes BASE_SAMPLING_LEVEL is 128) + assert BASE_SAMPLING_LEVEL == 128; + assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval)); + assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index b771e72..6bf17fd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import org.junit.Assert; import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -54,11 +57,17 @@ import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; @RunWith(OrderedJUnit4ClassRunner.class) public class SSTableReaderTest extends SchemaLoader { + private static final Logger logger = LoggerFactory.getLogger(SSTableReaderTest.class); + static Token t(int i) { return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i))); @@ -252,8 +261,8 @@ public class SSTableReaderTest extends SchemaLoader // test to see if sstable can be opened as expected SSTableReader target = SSTableReader.open(desc); - Assert.assertEquals(target.getKeySampleSize(), 1); - Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getKeySample(0)); + Assert.assertEquals(target.getIndexSummarySize(), 1); + Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getIndexSummaryKey(0)); assert target.first.equals(firstKey); assert target.last.equals(lastKey); } @@ -341,6 +350,64 @@ public class SSTableReaderTest extends SchemaLoader assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading"; } + @Test + public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException + { + Keyspace keyspace = Keyspace.open("Keyspace1"); + final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching + CompactionManager.instance.disableAutoCompaction(); + + final int NUM_ROWS = 1000; + for (int j = 0; j < NUM_ROWS; j++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j)); + RowMutation rm = new RowMutation("Keyspace1", key); + rm.add("StandardLowIndexInterval", ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j); + rm.apply(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store); + + Collection sstables = store.getSSTables(); + assert sstables.size() == 1; + final SSTableReader sstable = sstables.iterator().next(); + + ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); + List futures = new ArrayList<>(NUM_ROWS * 2); + for (int i = 0; i < NUM_ROWS; i++) + { + final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i)); + final int index = i; + + futures.add(executor.submit(new Runnable() + { + public void run() + { + ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100, 100); + assertFalse(result.isEmpty()); + assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(ByteBufferUtil.bytes("0")).value())); + } + })); + + futures.add(executor.submit(new Runnable() + { + public void run() + { + Iterable results = store.keySamples( + new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key))); + assertTrue(results.iterator().hasNext()); + } + })); + } + + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(Downsampling.MIN_SAMPLING_LEVEL); + store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement)); + for (Future future : futures) + future.get(); + + assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1); + } + private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws IOException { assert "Indexed1".equals(indexedCFS.name);