cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: Allow cancellation of index summary redistribution
Date Fri, 11 Dec 2015 16:44:05 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 95dab2730 -> fc7075a41


Allow cancellation of index summary redistribution

Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-8805


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc7075a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc7075a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc7075a4

Branch: refs/heads/cassandra-2.1
Commit: fc7075a41837301f3866333e0eb5c464715d888c
Parents: 95dab27
Author: Carl Yeksigian <carl@apache.org>
Authored: Tue Dec 8 12:22:25 2015 -0500
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Dec 11 17:20:18 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  14 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 265 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 338 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  69 +++-
 7 files changed, 435 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46cda65..2ee8b07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.13
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index d086eef..e88143e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -98,9 +98,17 @@ public final class CompactionInfo implements Serializable
     public String toString()
     {
         StringBuilder buff = new StringBuilder();
-        buff.append(getTaskType()).append('@').append(getId());
-        buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
-        buff.append(", ").append(getCompleted()).append('/').append(getTotal());
+        buff.append(getTaskType());
+        if (cfm != null)
+        {
+            buff.append('@').append(getId()).append('(');
+            buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", ");
+        }
+        else
+        {
+            buff.append('(');
+        }
+        buff.append(getCompleted()).append('/').append(getTotal());
         return buff.append(')').append(unit).toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2630ba2..9bddaf5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1222,6 +1222,20 @@ public class CompactionManager implements CompactionManagerMBean
         return executor.submit(runnable);
     }
 
+    public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution
redistribution) throws IOException
+    {
+        metrics.beginCompaction(redistribution);
+
+        try
+        {
+            return redistribution.redistributeSummaries();
+        }
+        finally
+        {
+            metrics.finishCompaction(redistribution);
+        }
+    }
+
     static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted
before now. We do not need to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 15d18f6..475b591 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -31,7 +31,8 @@ public enum OperationType
     /** Compaction for tombstone removal */
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unknown compaction type"),
-    ANTICOMPACTION("Anticompaction after repair");
+    ANTICOMPACTION("Anticompaction after repair"),
+    INDEX_SUMMARY("Index summary redistribution");
 
     private final String type;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 0c196ff..be5cc3c 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +31,6 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -45,11 +42,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-
 /**
  * Manages the fixed-size memory pool for index summaries, periodically resizing them
  * in order to give more memory to hot sstables and less memory to cold sstables.
@@ -255,261 +251,6 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
     @VisibleForTesting
     public static List<SSTableReader> redistributeSummaries(List<SSTableReader>
compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
     {
-        long total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
-            total += sstable.getIndexSummaryOffHeapSize();
-
-        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
-        for (SSTableReader sstable : nonCompacting)
-        {
-            // We can't change the sampling level of sstables with the old format, because
the serialization format
-            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993
for details.)
-            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
-            if (!sstable.descriptor.version.hasSamplingLevel)
-                oldFormatSSTables.add(sstable);
-        }
-        nonCompacting.removeAll(oldFormatSSTables);
-
-        logger.debug("Beginning redistribution of index summaries for {} sstables with memory
pool size {} MB; current spaced used is {} MB",
-                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0
/ 1024.0);
-
-        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
-        double totalReadsPerSec = 0.0;
-        for (SSTableReader sstable : nonCompacting)
-        {
-            if (sstable.getReadMeter() != null)
-            {
-                Double readRate = sstable.getReadMeter().fifteenMinuteRate();
-                totalReadsPerSec += readRate;
-                readRates.put(sstable, readRate);
-            }
-        }
-        logger.trace("Total reads/sec across all sstables in index summary resize process:
{}", totalReadsPerSec);
-
-        // copy and sort by read rates (ascending)
-        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
-        Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
-
-        long remainingBytes = memoryPoolBytes;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
-            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
-
-        logger.trace("Index summaries for compacting SSTables are using {} MB of space",
-                     (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
-        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec,
remainingBytes);
-
-        total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
-            total += sstable.getIndexSummaryOffHeapSize();
-        logger.debug("Completed resizing of index summaries; current approximate memory used:
{} MB",
-                     total / 1024.0 / 1024.0);
-
-        return newSSTables;
-    }
-
-    private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader>
sstables,
-                                                            double totalReadsPerSec, long
memoryPoolCapacity) throws IOException
-    {
-
-        List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() /
4);
-        List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
-        List<ResampleEntry> forceResample = new ArrayList<>();
-        List<ResampleEntry> forceUpsample = new ArrayList<>();
-        List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
-
-        // Going from the coldest to the hottest sstables, try to give each sstable an amount
of space proportional
-        // to the number of total reads/sec it handles.
-        long remainingSpace = memoryPoolCapacity;
-        for (SSTableReader sstable : sstables)
-        {
-            int minIndexInterval = sstable.metadata.getMinIndexInterval();
-            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
-
-            double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
-            long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
-
-            // figure out how many entries our idealSpace would buy us, and pick a new sampling
level based on that
-            int currentNumEntries = sstable.getIndexSummarySize();
-            double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
-            long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
-            int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
-            int maxSummarySize = sstable.getMaxIndexSummarySize();
-
-            // if the min_index_interval changed, calculate what our current sampling level
would be under the new min
-            if (sstable.getMinIndexInterval() != minIndexInterval)
-            {
-                int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval
/ (double) sstable.getMinIndexInterval()));
-                maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval()
/ (double) minIndexInterval));
-                logger.trace("min_index_interval changed from {} to {}, so the current sampling
level for {} is effectively now {} (was {})",
-                             sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel,
currentSamplingLevel);
-                currentSamplingLevel = effectiveSamplingLevel;
-            }
-
-            int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel,
currentNumEntries, targetNumEntries,
-                    minIndexInterval, maxIndexInterval);
-            int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel,
maxSummarySize);
-            double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
-
-            logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({}
entries); considering moving " +
-                    "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
-                    sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel,
currentNumEntries,
-                    currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
-                    numEntriesAtNewSamplingLevel * avgEntrySize);
-
-            if (effectiveIndexInterval < minIndexInterval)
-            {
-                // The min_index_interval was changed; re-sample to match it.
-                logger.debug("Forcing resample of {} because the current index interval ({})
is below min_index_interval ({})",
-                        sstable, effectiveIndexInterval, minIndexInterval);
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= spaceUsed;
-            }
-            else if (effectiveIndexInterval > maxIndexInterval)
-            {
-                // The max_index_interval was lowered; force an upsample to the effective
minimum sampling level
-                logger.debug("Forcing upsample of {} because the current index interval ({})
is above max_index_interval ({})",
-                        sstable, effectiveIndexInterval, maxIndexInterval);
-                newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) /
maxIndexInterval);
-                numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel,
sstable.getMaxIndexSummarySize());
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-            }
-            else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD &&
newSamplingLevel > currentSamplingLevel)
-            {
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-            }
-            else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD &&
newSamplingLevel < currentSamplingLevel)
-            {
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= spaceUsed;
-            }
-            else
-            {
-                // keep the same sampling level
-                logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
-                remainingSpace -= sstable.getIndexSummaryOffHeapSize();
-                newSSTables.add(sstable);
-            }
-            totalReadsPerSec -= readsPerSec;
-        }
-
-        if (remainingSpace > 0)
-        {
-            Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample,
remainingSpace);
-            toDownsample = result.right;
-            newSSTables.addAll(result.left);
-        }
-
-        // downsample first, then upsample
-        toDownsample.addAll(forceResample);
-        toDownsample.addAll(toUpsample);
-        toDownsample.addAll(forceUpsample);
-        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
-        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
-        for (ResampleEntry entry : toDownsample)
-        {
-            SSTableReader sstable = entry.sstable;
-            logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original
number of entries",
-                         sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
-                         entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
-            ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
-            DataTracker tracker = cfs.getDataTracker();
-
-            replacedByTracker.put(tracker, sstable);
-            replacementsByTracker.put(tracker, replacement);
-        }
-
-        for (DataTracker tracker : replacedByTracker.keySet())
-        {
-            tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
-            newSSTables.addAll(replacementsByTracker.get(tracker));
-        }
-
-        return newSSTables;
-    }
-
-    @VisibleForTesting
-    static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry>
toDownsample, long remainingSpace)
-    {
-        // sort by the amount of space regained by doing the downsample operation; we want
to try to avoid operations
-        // that will make little difference.
-        Collections.sort(toDownsample, new Comparator<ResampleEntry>()
-        {
-            public int compare(ResampleEntry o1, ResampleEntry o2)
-            {
-                return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
-                                      o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
-            }
-        });
-
-        int noDownsampleCutoff = 0;
-        List<SSTableReader> willNotDownsample = new ArrayList<>();
-        while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
-        {
-            ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
-
-            long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
-            // see if we have enough leftover space to keep the current sampling level
-            if (extraSpaceRequired <= remainingSpace)
-            {
-                logger.trace("Using leftover space to keep {} at the current sampling level
({})",
-                             entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
-                willNotDownsample.add(entry.sstable);
-                remainingSpace -= extraSpaceRequired;
-            }
-            else
-            {
-                break;
-            }
-
-            noDownsampleCutoff++;
-        }
-        return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
-    }
-
-    private static class ResampleEntry
-    {
-        public final SSTableReader sstable;
-        public final long newSpaceUsed;
-        public final int newSamplingLevel;
-
-        public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
-        {
-            this.sstable = sstable;
-            this.newSpaceUsed = newSpaceUsed;
-            this.newSamplingLevel = newSamplingLevel;
-        }
-    }
-
-    /** Utility class for sorting sstables by their read rates. */
-    private static class ReadRateComparator implements Comparator<SSTableReader>
-    {
-        private final Map<SSTableReader, Double> readRates;
-
-        public ReadRateComparator(Map<SSTableReader, Double> readRates)
-        {
-            this.readRates = readRates;
-        }
-
-        @Override
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            Double readRate1 = readRates.get(o1);
-            Double readRate2 = readRates.get(o2);
-            if (readRate1 == null && readRate2 == null)
-                return 0;
-            else if (readRate1 == null)
-                return -1;
-            else if (readRate2 == null)
-                return 1;
-            else
-                return Double.compare(readRate1, readRate2);
-        }
+        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting,
nonCompacting, memoryPoolBytes));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
new file mode 100644
index 0000000..adb3e4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+public class IndexSummaryRedistribution extends CompactionInfo.Holder
+{
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
+    private final List<SSTableReader> compacting;
+    private final List<SSTableReader> nonCompacting;
+    private final long memoryPoolBytes;
+    private volatile long remainingSpace;
+
+    public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader>
nonCompacting, long memoryPoolBytes)
+    {
+        this.compacting = compacting;
+        this.nonCompacting = nonCompacting;
+        this.memoryPoolBytes = memoryPoolBytes;
+    }
+
+    public List<SSTableReader> redistributeSummaries() throws IOException
+    {
+        long total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+            total += sstable.getIndexSummaryOffHeapSize();
+
+        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+        for (SSTableReader sstable : nonCompacting)
+        {
+            // We can't change the sampling level of sstables with the old format, because
the serialization format
+            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993
for details.)
+            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+            if (!sstable.descriptor.version.hasSamplingLevel)
+                oldFormatSSTables.add(sstable);
+        }
+        nonCompacting.removeAll(oldFormatSSTables);
+
+        logger.debug("Beginning redistribution of index summaries for {} sstables with memory
pool size {} MB; current spaced used is {} MB",
+                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0
/ 1024.0);
+
+        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+        double totalReadsPerSec = 0.0;
+        for (SSTableReader sstable : nonCompacting)
+        {
+            if (isStopRequested())
+                throw new CompactionInterruptedException(getCompactionInfo());
+
+            if (sstable.getReadMeter() != null)
+            {
+                Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                totalReadsPerSec += readRate;
+                readRates.put(sstable, readRate);
+            }
+        }
+        logger.trace("Total reads/sec across all sstables in index summary resize process:
{}", totalReadsPerSec);
+
+        // copy and sort by read rates (ascending)
+        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+        Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+        long remainingBytes = memoryPoolBytes;
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+        logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+                     (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec,
remainingBytes);
+
+        total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+            total += sstable.getIndexSummaryOffHeapSize();
+        logger.debug("Completed resizing of index summaries; current approximate memory used:
{} MB",
+                     total / 1024.0 / 1024.0);
+
+        return newSSTables;
+    }
+
+    private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+                                                     double totalReadsPerSec, long memoryPoolCapacity)
throws IOException
+    {
+
+        List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() /
4);
+        List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+        List<ResampleEntry> forceResample = new ArrayList<>();
+        List<ResampleEntry> forceUpsample = new ArrayList<>();
+        List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+        // Going from the coldest to the hottest sstables, try to give each sstable an amount
of space proportional
+        // to the number of total reads/sec it handles.
+        remainingSpace = memoryPoolCapacity;
+        for (SSTableReader sstable : sstables)
+        {
+            if (isStopRequested())
+                throw new CompactionInterruptedException(getCompactionInfo());
+
+            int minIndexInterval = sstable.metadata.getMinIndexInterval();
+            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+
+            double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+            long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+            // figure out how many entries our idealSpace would buy us, and pick a new sampling
level based on that
+            int currentNumEntries = sstable.getIndexSummarySize();
+            double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+            long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+            int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+            int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+            // if the min_index_interval changed, calculate what our current sampling level
would be under the new min
+            if (sstable.getMinIndexInterval() != minIndexInterval)
+            {
+                int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval
/ (double) sstable.getMinIndexInterval()));
+                maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval()
/ (double) minIndexInterval));
+                logger.trace("min_index_interval changed from {} to {}, so the current sampling
level for {} is effectively now {} (was {})",
+                             sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel,
currentSamplingLevel);
+                currentSamplingLevel = effectiveSamplingLevel;
+            }
+
+            int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel,
currentNumEntries, targetNumEntries,
+                                                                              minIndexInterval,
maxIndexInterval);
+            int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel,
maxSummarySize);
+            double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+            logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({}
entries); considering moving " +
+                         "from level {} ({} entries, {} bytes) to level {} ({} entries, {}
bytes)",
+                         sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries,
currentSamplingLevel, currentNumEntries,
+                         currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+                         numEntriesAtNewSamplingLevel * avgEntrySize);
+
+            if (effectiveIndexInterval < minIndexInterval)
+            {
+                // The min_index_interval was changed; re-sample to match it.
+                logger.debug("Forcing resample of {} because the current index interval ({})
is below min_index_interval ({})",
+                             sstable, effectiveIndexInterval, minIndexInterval);
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= spaceUsed;
+            }
+            else if (effectiveIndexInterval > maxIndexInterval)
+            {
+                // The max_index_interval was lowered; force an upsample to the effective
minimum sampling level
+                logger.debug("Forcing upsample of {} because the current index interval ({})
is above max_index_interval ({})",
+                             sstable, effectiveIndexInterval, maxIndexInterval);
+                newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) /
maxIndexInterval);
+                numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel,
sstable.getMaxIndexSummarySize());
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+            }
+            else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD
&& newSamplingLevel > currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+            }
+            else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD
&& newSamplingLevel < currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= spaceUsed;
+            }
+            else
+            {
+                // keep the same sampling level
+                logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+                remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                newSSTables.add(sstable);
+            }
+            totalReadsPerSec -= readsPerSec;
+        }
+
+        if (remainingSpace > 0)
+        {
+            Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample,
remainingSpace);
+            toDownsample = result.right;
+            newSSTables.addAll(result.left);
+        }
+
+        // downsample first, then upsample
+        toDownsample.addAll(forceResample);
+        toDownsample.addAll(toUpsample);
+        toDownsample.addAll(forceUpsample);
+        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+
+        try
+        {
+            for (ResampleEntry entry : toDownsample)
+            {
+                if (isStopRequested())
+                    throw new CompactionInterruptedException(getCompactionInfo());
+
+                SSTableReader sstable = entry.sstable;
+                logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the
original number of entries",
+                             sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+                             entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+                ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                DataTracker tracker = cfs.getDataTracker();
+                SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs,
entry.newSamplingLevel);
+                newSSTables.add(replacement);
+                replacedByTracker.put(tracker, sstable);
+                replacementsByTracker.put(tracker, replacement);
+            }
+        }
+        finally
+        {
+            for (DataTracker tracker : replacedByTracker.keySet())
+                tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+        }
+
+        return newSSTables;
+    }
+
+    @VisibleForTesting
+    static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry>
toDownsample, long remainingSpace)
+    {
+        // sort by the amount of space regained by doing the downsample operation; we want
to try to avoid operations
+        // that will make little difference.
+        Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+        {
+            public int compare(ResampleEntry o1, ResampleEntry o2)
+            {
+                return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+                                      o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+            }
+        });
+
+        int noDownsampleCutoff = 0;
+        List<SSTableReader> willNotDownsample = new ArrayList<>();
+        while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+        {
+            ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+            long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+            // see if we have enough leftover space to keep the current sampling level
+            if (extraSpaceRequired <= remainingSpace)
+            {
+                logger.trace("Using leftover space to keep {} at the current sampling level
({})",
+                             entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+                willNotDownsample.add(entry.sstable);
+                remainingSpace -= extraSpaceRequired;
+            }
+            else
+            {
+                break;
+            }
+
+            noDownsampleCutoff++;
+        }
+        return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes),
memoryPoolBytes, "bytes");
+    }
+
+    /** Utility class for sorting sstables by their read rates. */
+    private static class ReadRateComparator implements Comparator<SSTableReader>
+    {
+        private final Map<SSTableReader, Double> readRates;
+
+        ReadRateComparator(Map<SSTableReader, Double> readRates)
+        {
+            this.readRates = readRates;
+        }
+
+        @Override
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            Double readRate1 = readRates.get(o1);
+            Double readRate2 = readRates.get(o2);
+            if (readRate1 == null && readRate2 == null)
+                return 0;
+            else if (readRate1 == null)
+                return -1;
+            else if (readRate2 == null)
+                return 1;
+            else
+                return Double.compare(readRate1, readRate2);
+        }
+    }
+
+    private static class ResampleEntry
+    {
+        public final SSTableReader sstable;
+        public final long newSpaceUsed;
+        public final int newSamplingLevel;
+
+        ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+        {
+            this.sstable = sstable;
+            this.newSpaceUsed = newSpaceUsed;
+            this.newSamplingLevel = newSamplingLevel;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 64d3354..63928e2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,11 +38,12 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -75,6 +78,11 @@ public class IndexSummaryManagerTest extends SchemaLoader
     @After
     public void afterTest()
     {
+        for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+        {
+            holder.stop();
+        }
+
         String ksname = "Keyspace1";
         String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
         Keyspace keyspace = Keyspace.open(ksname);
@@ -499,4 +507,59 @@ public class IndexSummaryManagerTest extends SchemaLoader
                 assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
         }
     }
+
+    @Test
+    public void testCancelIndex() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        final int numSSTables = 4;
+        int numRows = 256;
+        createSSTables(ksname, cfname, numSSTables, numRows);
+
+        final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        for (SSTableReader sstable : sstables)
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+        final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+        // everything should get cut in half
+        final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+        Thread t = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables,
(singleSummaryOffHeapSpace * (numSSTables / 2)));
+                }
+                catch (CompactionInterruptedException ex)
+                {
+                    exception.set(ex);
+                }
+                catch (IOException ignored)
+                {
+                }
+            }
+        });
+        t.start();
+        while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
+            Thread.sleep(1);
+        CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+        t.join();
+
+        assertNotNull("Expected compaction interrupted exception", exception.get());
+        assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+        Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+        Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+        Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables,
afterCancelSSTables);
+        assertTrue(String.format("Mismatched files before and after cancelling redistribution:
%s",
+                                 Joiner.on(",").join(disjoint)),
+                   disjoint.isEmpty());
+
+        validateData(cfs, numRows);
+    }
 }


Mime
View raw message