cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: Remove wrapping compaction strategy
Date Fri, 05 Jun 2015 08:46:15 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 13ecf33d9 -> 7df3a5c99


Remove wrapping compaction strategy

Patch by marcuse; reviewed by JoshuaMcKenzie for CASSANDRA-9342


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7df3a5c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7df3a5c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7df3a5c9

Branch: refs/heads/trunk
Commit: 7df3a5c99da4a352b3b8599b2965a86dec1777e5
Parents: 13ecf33
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon May 11 13:43:13 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Jun 5 10:34:04 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   2 -
 .../cassandra/db/CollationController.java       |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  35 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +-
 .../db/compaction/CompactionManager.java        |  16 +-
 .../compaction/CompactionStrategyManager.java   | 463 +++++++++++++++++++
 .../cassandra/db/compaction/CompactionTask.java |   5 +-
 .../db/compaction/LeveledManifest.java          |  19 +-
 .../cassandra/db/compaction/Upgrader.java       |   8 +-
 .../compaction/WrappingCompactionStrategy.java  | 374 ---------------
 .../cassandra/metrics/ColumnFamilyMetrics.java  |   2 +-
 .../cassandra/metrics/CompactionMetrics.java    |   2 +-
 .../cassandra/service/CassandraDaemon.java      |   2 +-
 .../cassandra/tools/StandaloneScrubber.java     |   9 +-
 .../LongLeveledCompactionStrategyTest.java      |   3 +-
 test/unit/org/apache/cassandra/Util.java        |   2 +-
 .../compaction/CompactionAwareWriterTest.java   |   2 +-
 .../db/compaction/CompactionsPurgeTest.java     |   4 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |  16 +-
 .../io/sstable/SSTableMetadataTest.java         |   1 -
 .../io/sstable/SSTableRewriterTest.java         |   8 +-
 22 files changed, 533 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d8eeaf2..6bff44d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -854,8 +854,6 @@ public final class CFMetaData
     {
         className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
         Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
-        if (className.equals(WrappingCompactionStrategy.class.getName()))
-            throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!");
         if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
             throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 1e9a56d..7f6d439 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -149,7 +149,7 @@ public class CollationController
             // "hoist up" the requested data into a more recent sstable
             if (sstablesIterated > cfs.getMinimumCompactionThreshold()
                 && !cfs.isAutoCompactionDisabled()
-                && cfs.getCompactionStrategy().shouldDefragment())
+                && cfs.getCompactionStrategyManager().shouldDefragment())
             {
                 // !!WARNING!!   if we stop copying our data to a heap-managed object,
                 //               we will need to track the lifetime of this mutation as well

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 865bac9..2f6dce1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -173,7 +173,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
     private volatile DefaultInteger maxCompactionThreshold;
-    private final WrappingCompactionStrategy compactionStrategyWrapper;
+    private final CompactionStrategyManager compactionStrategyManager;
 
     public final Directories directories;
 
@@ -199,7 +199,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             for (ColumnFamilyStore cfs : concatWithIndexes())
                 cfs.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold());
 
-        compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
+        compactionStrategyManager.maybeReload(metadata);
 
         scheduleFlush();
 
@@ -250,7 +250,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         try
         {
             metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
-            compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
+            compactionStrategyManager.maybeReload(metadata);
         }
         catch (ConfigurationException e)
         {
@@ -348,12 +348,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             CacheService.instance.keyCache.loadSaved(this);
 
         // compaction strategy should be created after the CFS has been prepared
-        this.compactionStrategyWrapper = new WrappingCompactionStrategy(this);
+        this.compactionStrategyManager = new CompactionStrategyManager(this);
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
         {
             logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead.");
-            this.compactionStrategyWrapper.disable();
+            this.compactionStrategyManager.disable();
         }
 
         // create the private ColumnFamilyStores for the secondary column indexes
@@ -434,6 +434,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         latencyCalculator.cancel(false);
+        compactionStrategyManager.shutdown();
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
         data.dropSSTables();
         indexManager.invalidate();
@@ -1499,7 +1500,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
-        compactionStrategyWrapper.replaceFlushed(memtable, sstable);
+        compactionStrategyManager.replaceFlushed(memtable, sstable);
     }
 
     public boolean isValid()
@@ -1832,7 +1833,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             public List<SSTableReader> apply(View view)
             {
-                return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key));
+                return compactionStrategyManager.filterSSTablesForReads(view.intervalTree.search(key));
             }
         };
     }
@@ -1847,7 +1848,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             public List<SSTableReader> apply(View view)
             {
-                return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
+                return compactionStrategyManager.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
             }
         };
     }
@@ -2551,7 +2552,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
             for (ColumnFamilyStore cfs : selfWithIndexes)
-                cfs.getCompactionStrategy().pause();
+                cfs.getCompactionStrategyManager().pause();
             try
             {
                 // interrupt in-progress compactions
@@ -2582,7 +2583,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             finally
             {
                 for (ColumnFamilyStore cfs : selfWithIndexes)
-                    cfs.getCompactionStrategy().resume();
+                    cfs.getCompactionStrategyManager().resume();
             }
         }
     }
@@ -2620,7 +2621,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         // we don't use CompactionStrategy.pause since we don't want users flipping that on and off
         // during runWithCompactionsDisabled
-        this.compactionStrategyWrapper.disable();
+        compactionStrategyManager.disable();
     }
 
     public void enableAutoCompaction()
@@ -2635,7 +2636,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     @VisibleForTesting
     public void enableAutoCompaction(boolean waitForFutures)
     {
-        this.compactionStrategyWrapper.enable();
+        compactionStrategyManager.enable();
         List<Future<?>> futures = CompactionManager.instance.submitBackground(this);
         if (waitForFutures)
             FBUtilities.waitOnFutures(futures);
@@ -2643,7 +2644,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public boolean isAutoCompactionDisabled()
     {
-        return !this.compactionStrategyWrapper.isEnabled();
+        return !this.compactionStrategyManager.isEnabled();
     }
 
     /*
@@ -2655,9 +2656,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
        - get/set memtime
      */
 
-    public AbstractCompactionStrategy getCompactionStrategy()
+    public CompactionStrategyManager getCompactionStrategyManager()
     {
-        return compactionStrategyWrapper;
+        return compactionStrategyManager;
     }
 
     public void setCompactionThresholds(int minThreshold, int maxThreshold)
@@ -2745,12 +2746,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getUnleveledSSTables()
     {
-        return this.compactionStrategyWrapper.getUnleveledSSTables();
+        return this.compactionStrategyManager.getUnleveledSSTables();
     }
 
     public int[] getSSTableCountPerLevel()
     {
-        return compactionStrategyWrapper.getSSTableCountPerLevel();
+        return compactionStrategyManager.getSSTableCountPerLevel();
     }
 
     public static class ViewFragment

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 1d86784..cb5c54d 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -304,7 +304,7 @@ public class Keyspace
         if (cfs == null)
             return;
 
-        cfs.getCompactionStrategy().shutdown();
+        cfs.getCompactionStrategyManager().shutdown();
         CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), true);
         // wait for any outstanding reads/writes that might affect the CFS
         cfs.keyspace.writeOrder.awaitNewBarrier();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a2783da..21c3f50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -176,7 +176,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.debug("Scheduling a background task check for {}.{} with {}",
                      cfs.keyspace.getName(),
                      cfs.name,
-                     cfs.getCompactionStrategy().getName());
+                     cfs.getCompactionStrategyManager().getName());
         List<Future<?>> futures = new ArrayList<Future<?>>();
         // we must schedule it at least once, otherwise compaction will stop for a CF until next flush
         do {
@@ -229,7 +229,7 @@ public class CompactionManager implements CompactionManagerMBean
                     return;
                 }
 
-                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+                CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
                 AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
                 if (task == null)
                 {
@@ -390,7 +390,7 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction txn) throws IOException
             {
-                AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
+                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
                 task.setUserDefined(true);
                 task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                 task.execute(metrics);
@@ -546,7 +546,7 @@ public class CompactionManager implements CompactionManagerMBean
         // here we compute the task off the compaction executor, so having that present doesn't
         // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
         // for ourselves to finish/acknowledge cancellation before continuing.
-        final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategy().getMaximalTask(gcBefore, splitOutput);
+        final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
 
         if (tasks == null)
             return Collections.emptyList();
@@ -626,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 else
                 {
-                    AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
+                    AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
                     if (task != null)
                         task.execute(metrics);
                 }
@@ -1096,7 +1096,7 @@ public class CompactionManager implements CompactionManagerMBean
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();
-            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range))
             {
                 CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
@@ -1158,7 +1158,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
 
         //Group SSTables
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1206,7 +1206,7 @@ public class CompactionManager implements CompactionManagerMBean
         File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
              SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
new file mode 100644
index 0000000..38fb09f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import com.google.common.collect.Iterables;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Manages the compaction strategies.
+ *
+ * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ */
+public class CompactionStrategyManager implements INotificationConsumer
+{
+    protected static final String COMPACTION_ENABLED = "enabled";
+    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+    private final ColumnFamilyStore cfs;
+    private volatile AbstractCompactionStrategy repaired;
+    private volatile AbstractCompactionStrategy unrepaired;
+    private volatile boolean enabled = true;
+    public boolean isActive = true;
+
+    public CompactionStrategyManager(ColumnFamilyStore cfs)
+    {
+        cfs.getTracker().subscribe(this);
+        logger.debug("{} subscribed to the data tracker.", this);
+        this.cfs = cfs;
+        reload(cfs.metadata);
+        String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
+        enabled = optionValue == null || Boolean.parseBoolean(optionValue);
+    }
+
+    /**
+     * Return the next background task
+     *
+     * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
+     *
+     */
+    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        if (!isEnabled())
+            return null;
+
+        maybeReload(cfs.metadata);
+
+        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+        {
+            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+            if (repairedTask != null)
+                return repairedTask;
+            return unrepaired.getNextBackgroundTask(gcBefore);
+        }
+        else
+        {
+            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+            if (unrepairedTask != null)
+                return unrepairedTask;
+            return repaired.getNextBackgroundTask(gcBefore);
+        }
+    }
+
+    /**
+     * Disable compaction - used with nodetool disableautocompaction for example
+     */
+    public void disable()
+    {
+        enabled = false;
+    }
+
+    /**
+     * re-enable disabled compaction.
+     */
+    public void enable()
+    {
+        enabled = true;
+    }
+
+    public boolean isEnabled()
+    {
+        return enabled && isActive;
+    }
+
+    public synchronized void resume()
+    {
+        isActive = true;
+    }
+
+    /**
+     * pause compaction while we cancel all ongoing compactions
+     *
+     * Separate call from enable/disable to not have to save the enabled-state externally
+      */
+    public synchronized void pause()
+    {
+        isActive = false;
+    }
+
+
+    private void startup()
+    {
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+                getCompactionStrategyFor(sstable).addSSTable(sstable);
+        }
+        repaired.startup();
+        unrepaired.startup();
+    }
+
+    /**
+     * return the compaction strategy for the given sstable
+     *
+     * returns differently based on the repaired status
+     * @param sstable
+     * @return
+     */
+    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    {
+        if (sstable.isRepaired())
+            return repaired;
+        else
+            return unrepaired;
+    }
+
+    public void shutdown()
+    {
+        isActive = false;
+        repaired.shutdown();
+        unrepaired.shutdown();
+    }
+
+
+    public synchronized void maybeReload(CFMetaData metadata)
+    {
+        if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
+                && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
+                && repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes all have the same options
+                && unrepaired.options.equals(metadata.compactionStrategyOptions))
+            return;
+
+        reload(metadata);
+    }
+
+    /**
+     * Reload the compaction strategies
+     *
+     * Called after changing configuration and at startup.
+     * @param metadata
+     */
+    public synchronized void reload(CFMetaData metadata)
+    {
+        if (repaired != null)
+            repaired.shutdown();
+        if (unrepaired != null)
+            unrepaired.shutdown();
+        repaired = metadata.createCompactionStrategyInstance(cfs);
+        unrepaired = metadata.createCompactionStrategyInstance(cfs);
+        startup();
+    }
+
+    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    {
+        cfs.getTracker().replaceFlushed(memtable, sstable);
+        if (sstable != null)
+            CompactionManager.instance.submitBackground(cfs);
+    }
+
+    /**
+     * TODO: remove, unused
+     */
+    public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables)
+    {
+        // todo: union of filtered sstables or intersection?
+        return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables));
+    }
+
+    public int getUnleveledSSTables()
+    {
+        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        {
+            int count = 0;
+            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
+            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+            return count;
+        }
+        return 0;
+    }
+
+    public synchronized int[] getSSTableCountPerLevel()
+    {
+        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        {
+            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+            res = sumArrays(res, repairedCountPerLevel);
+            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+            res = sumArrays(res, unrepairedCountPerLevel);
+            return res;
+        }
+        return null;
+    }
+
+    private static int[] sumArrays(int[] a, int[] b)
+    {
+        int[] res = new int[Math.max(a.length, b.length)];
+        for (int i = 0; i < res.length; i++)
+        {
+            if (i < a.length && i < b.length)
+                res[i] = a[i] + b[i];
+            else if (i < a.length)
+                res[i] = a[i];
+            else
+                res[i] = b[i];
+        }
+        return res;
+    }
+
+    public boolean shouldDefragment()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.shouldDefragment();
+    }
+
+
+    public synchronized void handleNotification(INotification notification, Object sender)
+    {
+        if (notification instanceof SSTableAddedNotification)
+        {
+            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+            if (flushedNotification.added.isRepaired())
+                repaired.addSSTable(flushedNotification.added);
+            else
+                unrepaired.addSSTable(flushedNotification.added);
+        }
+        else if (notification instanceof SSTableListChangedNotification)
+        {
+            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+            Set<SSTableReader> repairedRemoved = new HashSet<>();
+            Set<SSTableReader> repairedAdded = new HashSet<>();
+            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
+            Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+            for (SSTableReader sstable : listChangedNotification.removed)
+            {
+                if (sstable.isRepaired())
+                    repairedRemoved.add(sstable);
+                else
+                    unrepairedRemoved.add(sstable);
+            }
+            for (SSTableReader sstable : listChangedNotification.added)
+            {
+                if (sstable.isRepaired())
+                    repairedAdded.add(sstable);
+                else
+                    unrepairedAdded.add(sstable);
+            }
+            if (!repairedRemoved.isEmpty())
+            {
+                repaired.replaceSSTables(repairedRemoved, repairedAdded);
+            }
+            else
+            {
+                for (SSTableReader sstable : repairedAdded)
+                    repaired.addSSTable(sstable);
+            }
+
+            if (!unrepairedRemoved.isEmpty())
+            {
+                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
+            }
+            else
+            {
+                for (SSTableReader sstable : unrepairedAdded)
+                    unrepaired.addSSTable(sstable);
+            }
+        }
+        else if (notification instanceof SSTableRepairStatusChanged)
+        {
+            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+            {
+                if (sstable.isRepaired())
+                {
+                    unrepaired.removeSSTable(sstable);
+                    repaired.addSSTable(sstable);
+                }
+                else
+                {
+                    repaired.removeSSTable(sstable);
+                    unrepaired.addSSTable(sstable);
+                }
+            }
+        }
+        else if (notification instanceof SSTableDeletingNotification)
+        {
+            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+            if (sstable.isRepaired())
+                repaired.removeSSTable(sstable);
+            else
+                unrepaired.removeSSTable(sstable);
+        }
+    }
+
+    /**
+     * Create ISSTableScanner from the given sstables
+     *
+     * Delegates the call to the compaction strategies to allow LCS to create a scanner
+     * @param sstables
+     * @param range
+     * @return
+     */
+    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    {
+        List<SSTableReader> repairedSSTables = new ArrayList<>();
+        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+
+        for (SSTableReader sstable : sstables)
+        {
+            if (sstable.isRepaired())
+                repairedSSTables.add(sstable);
+            else
+                unrepairedSSTables.add(sstable);
+        }
+
+        List<ISSTableScanner> scanners = new ArrayList<>();
+
+        if (!repairedSSTables.isEmpty())
+            scanners.addAll(repaired.getScanners(repairedSSTables, range).scanners);
+        if (!unrepairedSSTables.isEmpty())
+            scanners.addAll(unrepaired.getScanners(unrepairedSSTables, range).scanners);
+
+        return new AbstractCompactionStrategy.ScannerList(scanners);
+    }
+
+    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+    {
+        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+    }
+
+    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+    {
+        return getScanners(sstables, null);
+    }
+
+    public long getMaxSSTableBytes()
+    {
+        return unrepaired.getMaxSSTableBytes();
+    }
+
+    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
+    {
+        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+    }
+
+    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
+    {
+        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+        // sstables are marked the compactions are re-enabled
+        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+        {
+            @Override
+            public Collection<AbstractCompactionTask> call() throws Exception
+            {
+                synchronized (CompactionStrategyManager.this)
+                {
+                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
+                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
+
+                    if (repairedTasks == null && unrepairedTasks == null)
+                        return null;
+
+                    if (repairedTasks == null)
+                        return unrepairedTasks;
+                    if (unrepairedTasks == null)
+                        return repairedTasks;
+
+                    List<AbstractCompactionTask> tasks = new ArrayList<>();
+                    tasks.addAll(repairedTasks);
+                    tasks.addAll(unrepairedTasks);
+                    return tasks;
+                }
+            }
+        }, false);
+    }
+
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        tasks += repaired.getEstimatedRemainingTasks();
+        tasks += unrepaired.getEstimatedRemainingTasks();
+
+        return tasks;
+    }
+
+    public boolean shouldBeEnabled()
+    {
+        String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
+        return optionValue == null || Boolean.parseBoolean(optionValue);
+    }
+
+    public String getName()
+    {
+        return unrepaired.getName();
+    }
+
+    public List<AbstractCompactionStrategy> getStrategies()
+    {
+        return Arrays.asList(repaired, unrepaired);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 7089016..17e55bf 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -105,7 +105,7 @@ public class CompactionTask extends AbstractCompactionTask
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
         // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
 
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
@@ -159,6 +159,7 @@ public class CompactionTask extends AbstractCompactionTask
             // See CASSANDRA-8019 and CASSANDRA-8399
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
             {
+
                 ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
                 try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
                 {
@@ -166,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask
                         collector.beginCompaction(ci);
                     long lastCheckObsoletion = start;
 
-                    if (!controller.cfs.getCompactionStrategy().isActive)
+                    if (!controller.cfs.getCompactionStrategyManager().isActive)
                         throw new CompactionInterruptedException(ci.getCompactionInfo());
 
                     try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 0d0928f..0763316 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -56,7 +56,10 @@ public class LeveledManifest
      * that level into lower level compactions
      */
     private static final int NO_COMPACTION_LIMIT = 25;
-
+    // allocate enough generations for a PB of data, with a 1-MB sstable size.  (Note that if maxSSTableSize is
+    // updated, we will still have sstables of the older, potentially smaller size.  So don't make this
+    // dependent on maxSSTableSize.)
+    public static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
     private final ColumnFamilyStore cfs;
     @VisibleForTesting
     protected final List<SSTableReader>[] generations;
@@ -71,18 +74,14 @@ public class LeveledManifest
         this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024L * 1024L;
         this.options = options;
 
-        // allocate enough generations for a PB of data, with a 1-MB sstable size.  (Note that if maxSSTableSize is
-        // updated, we will still have sstables of the older, potentially smaller size.  So don't make this
-        // dependent on maxSSTableSize.)
-        int n = (int) Math.log10(1000 * 1000 * 1000);
-        generations = new List[n];
-        lastCompactedKeys = new RowPosition[n];
+        generations = new List[MAX_LEVEL_COUNT];
+        lastCompactedKeys = new RowPosition[MAX_LEVEL_COUNT];
         for (int i = 0; i < generations.length; i++)
         {
             generations[i] = new ArrayList<>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
-        compactionCounter = new int[n];
+        compactionCounter = new int[MAX_LEVEL_COUNT];
     }
 
     public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -340,7 +339,7 @@ public class LeveledManifest
                     candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
                     if (logger.isDebugEnabled())
                         logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
-                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
+                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes());
                 }
                 else
                 {
@@ -355,7 +354,7 @@ public class LeveledManifest
         Collection<SSTableReader> candidates = getCandidatesFor(0);
         if (candidates.isEmpty())
             return null;
-        return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
+        return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());
     }
 
     private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index ca975b8..a0cce24 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -43,7 +43,7 @@ public class Upgrader
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
     private final CompactionController controller;
-    private final AbstractCompactionStrategy strategy;
+    private final CompactionStrategyManager strategyManager;
     private final long estimatedRows;
 
     private final OutputHandler outputHandler;
@@ -59,9 +59,9 @@ public class Upgrader
 
         this.controller = new UpgradeController(cfs);
 
-        this.strategy = cfs.getCompactionStrategy();
+        this.strategyManager = cfs.getCompactionStrategyManager();
         long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes());
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategyManager.getMaxSSTableBytes());
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 
@@ -86,7 +86,7 @@ public class Upgrader
         outputHandler.output("Upgrading " + sstable);
 
         try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
-             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals());
+             AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
              CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator())
         {
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
deleted file mode 100644
index adda0c9..0000000
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.notifications.INotification;
-import org.apache.cassandra.notifications.INotificationConsumer;
-import org.apache.cassandra.notifications.SSTableAddedNotification;
-import org.apache.cassandra.notifications.SSTableDeletingNotification;
-import org.apache.cassandra.notifications.SSTableListChangedNotification;
-import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
-
-public final class WrappingCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
-{
-    private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
-    private volatile AbstractCompactionStrategy repaired;
-    private volatile AbstractCompactionStrategy unrepaired;
-    public WrappingCompactionStrategy(ColumnFamilyStore cfs)
-    {
-        super(cfs, cfs.metadata.compactionStrategyOptions);
-        reloadCompactionStrategy(cfs.metadata);
-        cfs.getTracker().subscribe(this);
-        logger.debug("{} subscribed to the data tracker.", this);
-    }
-
-    @Override
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
-    {
-        if (!isEnabled())
-            return null;
-
-        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
-        {
-            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
-            if (repairedTask != null)
-                return repairedTask;
-            return unrepaired.getNextBackgroundTask(gcBefore);
-        }
-        else
-        {
-            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
-            if (unrepairedTask != null)
-                return unrepairedTask;
-            return repaired.getNextBackgroundTask(gcBefore);
-        }
-
-    }
-
-    @Override
-    public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, final boolean splitOutput)
-    {
-        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
-        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
-        // sstables are marked the compactions are re-enabled
-        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
-        {
-            @Override
-            public Collection<AbstractCompactionTask> call() throws Exception
-            {
-                synchronized (WrappingCompactionStrategy.this)
-                {
-                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
-                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
-
-                    if (repairedTasks == null && unrepairedTasks == null)
-                        return null;
-
-                    if (repairedTasks == null)
-                        return unrepairedTasks;
-                    if (unrepairedTasks == null)
-                        return repairedTasks;
-
-                    List<AbstractCompactionTask> tasks = new ArrayList<>();
-                    tasks.addAll(repairedTasks);
-                    tasks.addAll(unrepairedTasks);
-                    return tasks;
-                }
-            }
-        }, false);
-    }
-
-    @Override
-    public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
-    {
-        assert !sstables.isEmpty();
-        boolean userDefinedInRepaired = sstables.iterator().next().isRepaired();
-        for (SSTableReader sstable : sstables)
-        {
-            if (userDefinedInRepaired != sstable.isRepaired())
-            {
-                logger.error("You can't mix repaired and unrepaired sstables in a user defined compaction");
-                return null;
-            }
-        }
-        if (userDefinedInRepaired)
-            return repaired.getUserDefinedTask(sstables, gcBefore);
-        else
-            return unrepaired.getUserDefinedTask(sstables, gcBefore);
-    }
-
-    @Override
-    public synchronized int getEstimatedRemainingTasks()
-    {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.getEstimatedRemainingTasks() + unrepaired.getEstimatedRemainingTasks();
-    }
-
-    @Override
-    public synchronized long getMaxSSTableBytes()
-    {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return unrepaired.getMaxSSTableBytes();
-    }
-
-    public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
-    {
-        if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
-            && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
-            && repaired.options.equals(metadata.compactionStrategyOptions)
-            && unrepaired.options.equals(metadata.compactionStrategyOptions))
-            return;
-
-        reloadCompactionStrategy(metadata);
-    }
-
-    public synchronized void reloadCompactionStrategy(CFMetaData metadata)
-    {
-        if (repaired != null)
-            repaired.shutdown();
-        if (unrepaired != null)
-            unrepaired.shutdown();
-        repaired = metadata.createCompactionStrategyInstance(cfs);
-        unrepaired = metadata.createCompactionStrategyInstance(cfs);
-        startup();
-    }
-
-    public synchronized int getUnleveledSSTables()
-    {
-        if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
-        {
-            return ((LeveledCompactionStrategy)repaired).getLevelSize(0) + ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
-        }
-        return 0;
-    }
-
-    public synchronized int[] getSSTableCountPerLevel()
-    {
-        if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
-        {
-            int [] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
-            int [] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
-            return sumArrays(repairedCountPerLevel, unrepairedCountPerLevel);
-        }
-        return null;
-    }
-
-    public static int [] sumArrays(int[] a, int [] b)
-    {
-        int [] res = new int[Math.max(a.length, b.length)];
-        for (int i = 0; i < res.length; i++)
-        {
-            if (i < a.length && i < b.length)
-                res[i] = a[i] + b[i];
-            else if (i < a.length)
-                res[i] = a[i];
-            else
-                res[i] = b[i];
-        }
-        return res;
-    }
-
-    @Override
-    public boolean shouldDefragment()
-    {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.shouldDefragment();
-    }
-
-    @Override
-    public String getName()
-    {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.getName();
-    }
-
-    @Override
-    public void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added)
-    {
-        throw new UnsupportedOperationException("Can't replace sstables in the wrapping compaction strategy");
-    }
-
-    @Override
-    public void addSSTable(SSTableReader added)
-    {
-        throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction strategy");
-    }
-
-    @Override
-    public void removeSSTable(SSTableReader sstable)
-    {
-        throw new UnsupportedOperationException("Can't remove sstables from the wrapping compaction strategy");
-    }
-
-    public synchronized void handleNotification(INotification notification, Object sender)
-    {
-        if (notification instanceof SSTableAddedNotification)
-        {
-            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
-            if (flushedNotification.added.isRepaired())
-                repaired.addSSTable(flushedNotification.added);
-            else
-                unrepaired.addSSTable(flushedNotification.added);
-        }
-        else if (notification instanceof SSTableListChangedNotification)
-        {
-            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            Set<SSTableReader> repairedRemoved = new HashSet<>();
-            Set<SSTableReader> repairedAdded = new HashSet<>();
-            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
-            Set<SSTableReader> unrepairedAdded = new HashSet<>();
-
-            for (SSTableReader sstable : listChangedNotification.removed)
-            {
-                if (sstable.isRepaired())
-                    repairedRemoved.add(sstable);
-                else
-                    unrepairedRemoved.add(sstable);
-            }
-            for (SSTableReader sstable : listChangedNotification.added)
-            {
-                if (sstable.isRepaired())
-                    repairedAdded.add(sstable);
-                else
-                    unrepairedAdded.add(sstable);
-            }
-            if (!repairedRemoved.isEmpty())
-            {
-                repaired.replaceSSTables(repairedRemoved, repairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : repairedAdded)
-                    repaired.addSSTable(sstable);
-            }
-
-            if (!unrepairedRemoved.isEmpty())
-            {
-                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : unrepairedAdded)
-                    unrepaired.addSSTable(sstable);
-            }
-        }
-        else if (notification instanceof SSTableRepairStatusChanged)
-        {
-            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
-            {
-                if (sstable.isRepaired())
-                {
-                    unrepaired.removeSSTable(sstable);
-                    repaired.addSSTable(sstable);
-                }
-                else
-                {
-                    repaired.removeSSTable(sstable);
-                    unrepaired.addSSTable(sstable);
-                }
-            }
-        }
-        else if (notification instanceof SSTableDeletingNotification)
-        {
-            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
-            if (sstable.isRepaired())
-                repaired.removeSSTable(sstable);
-            else
-                unrepaired.removeSSTable(sstable);
-        }
-    }
-
-    @Override
-    public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables)
-    {
-        // todo: union of filtered sstables or intersection?
-        return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables));
-    }
-
-    @Override
-    public synchronized void startup()
-    {
-        super.startup();
-        for (SSTableReader sstable : cfs.getSSTables())
-        {
-            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-            {
-                if (sstable.isRepaired())
-                    repaired.addSSTable(sstable);
-                else
-                    unrepaired.addSSTable(sstable);
-            }
-        }
-        repaired.startup();
-        unrepaired.startup();
-    }
-
-    @Override
-    public synchronized void shutdown()
-    {
-        super.shutdown();
-        repaired.shutdown();
-        unrepaired.shutdown();
-    }
-
-    @Override
-    @SuppressWarnings("resource")
-    public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
-    {
-        List<SSTableReader> repairedSSTables = new ArrayList<>();
-        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
-        for (SSTableReader sstable : sstables)
-            if (sstable.isRepaired())
-                repairedSSTables.add(sstable);
-            else
-                unrepairedSSTables.add(sstable);
-        ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
-        ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
-        List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
-        scanners.addAll(repairedScanners.scanners);
-        scanners.addAll(unrepairedScanners.scanners);
-        return new ScannerList(scanners);
-    }
-
-    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
-    {
-        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
-    }
-
-    public List<AbstractCompactionStrategy> getWrappedStrategies()
-    {
-        return Arrays.asList(repaired, unrepaired);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index 4ab4446..6ad50d3 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -351,7 +351,7 @@ public class ColumnFamilyMetrics
         {
             public Integer getValue()
             {
-                return cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+                return cfs.getCompactionStrategyManager().getEstimatedRemainingTasks();
             }
         });
         liveSSTableCount = createColumnFamilyGauge("LiveSSTableCount", new Gauge<Integer>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index a62e3c4..20a5685 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -61,7 +61,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
                 for (String keyspaceName : Schema.instance.getKeyspaces())
                 {
                     for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
-                        n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+                        n += cfs.getCompactionStrategyManager().getEstimatedRemainingTasks();
                 }
                 for (ThreadPoolExecutor collector : collectors)
                     n += collector.getTaskCount() - collector.getCompletedTaskCount();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 30a2b6e..d24c579 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -249,7 +249,7 @@ public class CassandraDaemon
             {
                 for (final ColumnFamilyStore store : cfs.concatWithIndexes())
                 {
-                    if (store.getCompactionStrategy().shouldBeEnabled())
+                    if (store.getCompactionStrategyManager().shouldBeEnabled())
                         store.enableAutoCompaction();
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index dd513b8..81b41bc 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -145,7 +145,7 @@ public class StandaloneScrubber
             }
 
             // Check (and repair) manifests
-            checkManifest(cfs.getCompactionStrategy(), cfs, sstables);
+            checkManifest(cfs.getCompactionStrategyManager(), cfs, sstables);
             CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
             SSTableDeletingTask.waitForDeletions();
             System.exit(0); // We need that to stop non daemonized threads
@@ -159,11 +159,10 @@ public class StandaloneScrubber
         }
     }
 
-    private static void checkManifest(AbstractCompactionStrategy strategy, ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
+    private static void checkManifest(CompactionStrategyManager strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
-        WrappingCompactionStrategy wrappingStrategy = (WrappingCompactionStrategy)strategy;
-        int maxSizeInMB = (int)((cfs.getCompactionStrategy().getMaxSSTableBytes()) / (1024L * 1024L));
-        if (wrappingStrategy.getWrappedStrategies().size() == 2 && wrappingStrategy.getWrappedStrategies().get(0) instanceof LeveledCompactionStrategy)
+        int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L));
+        if (strategyManager.getStrategies().size() == 2 && strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy)
         {
             System.out.println("Checking leveled manifest");
             Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index e38eb3c..cc8203e 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -63,8 +63,7 @@ public class LongLeveledCompactionStrategyTest
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
         store.disableAutoCompaction();
 
-        WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) store.getCompactionStrategy());
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) strategy.getWrappedStrategies().get(1);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1);
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b8f33d3..2d59abb 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -307,7 +307,7 @@ public class Util
     public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
         int gcBefore = cfs.gcBefore(System.currentTimeMillis());
-        AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
+        AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
         task.execute(null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 235fd49..2b6f575 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -179,7 +179,7 @@ public class CompactionAwareWriterTest
     {
         assert txn.originals().size() == 1;
         int rowsWritten = 0;
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(txn.originals()))
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals()))
         {
             CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(System.currentTimeMillis()));
             ISSTableScanner scanner = scanners.scanners.get(0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index e5baab6..d03c35a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -176,7 +176,7 @@ public class CompactionsPurgeTest
         rm.add(cfName, cellname(String.valueOf(5)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
         rm.applyUnsafe();
         cfs.forceBlockingFlush();
-        cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+        cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
 
         // verify that minor compaction does GC when key is provably not
         // present in a non-compacted sstable
@@ -223,7 +223,7 @@ public class CompactionsPurgeTest
         cfs.forceBlockingFlush();
 
         // compact the sstables with the c1/c2 data and the c1 tombstone
-        cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+        cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
 
         // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
         // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 18418e8..a1fb33d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -152,9 +152,9 @@ public class CompactionsTest
     public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
     {
         ColumnFamilyStore store = testSingleSSTableCompaction(LeveledCompactionStrategy.class.getCanonicalName());
-        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) store.getCompactionStrategy();
+        CompactionStrategyManager strategyManager = store.getCompactionStrategyManager();
         // tombstone removal compaction should not promote level
-        assert strategy.getSSTableCountPerLevel()[0] == 1;
+        assert strategyManager.getSSTableCountPerLevel()[0] == 1;
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index c7935fe..d85bd6a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -125,12 +125,12 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
+        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
         assert strategy.getSSTableCountPerLevel()[1] > 0;
         assert strategy.getSSTableCountPerLevel()[2] > 0;
 
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(cfs.getSSTables());
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(cfs.getSSTables());
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
         {
             int groupLevel = -1;
@@ -176,7 +176,7 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
+        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
         assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
         assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
@@ -195,7 +195,7 @@ public class LeveledCompactionStrategyTest
      */
     private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
     {
-        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
+        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
         // L0 is the lowest priority, so when that's done, we know everything is done
         while (strategy.getSSTableCountPerLevel()[0] > 1)
             Thread.sleep(100);
@@ -223,7 +223,7 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
         assert strategy.getLevelSize(1) > 0;
 
         // get LeveledScanner for level 1 sstables
@@ -262,7 +262,7 @@ public class LeveledCompactionStrategyTest
         }
         waitForLeveling(cfs);
         cfs.forceBlockingFlush();
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
         cfs.disableAutoCompaction();
 
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
@@ -314,8 +314,8 @@ public class LeveledCompactionStrategyTest
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
             Thread.sleep(100);
 
-        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
-        List<AbstractCompactionStrategy> strategies = strategy.getWrappedStrategies();
+        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        List<AbstractCompactionStrategy> strategies = strategy.getStrategies();
         LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0);
         LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1);
         assertEquals(0, repaired.manifest.getLevelCount() );

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
index 755225e..f952b91 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -201,7 +201,6 @@ public class SSTableMetadataTest
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard3");
-        store.getCompactionStrategy();
         for (int j = 0; j < 8; j++)
         {
             DecoratedKey key = Util.dk("row"+j);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 9e1cb91..8f6ec16 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -100,7 +100,7 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         assertEquals(sstables.iterator().next().bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount());
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
         {
@@ -132,7 +132,7 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
 
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);)
         {
@@ -167,7 +167,7 @@ public class SSTableRewriterTest extends SchemaLoader
         assertEquals(1, sstables.size());
 
         boolean checked = false;
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
         {
@@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> sstables = Sets.newHashSet(s);
         assertEquals(1, sstables.size());
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, false);
              SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, false, false))


Mime
View raw message