cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: Replace compaction lock with runWithCompactionsDisabled patch by jbellis; reviewed by slebresne for CASSANDRA-3430
Date Thu, 21 Mar 2013 13:59:09 GMT
Updated Branches:
  refs/heads/trunk c47f4070d -> d72e9381f


Replace compaction lock with runWithCompactionsDisabled
patch by jbellis; reviewed by slebresne for CASSANDRA-3430


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d72e9381
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d72e9381
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d72e9381

Branch: refs/heads/trunk
Commit: d72e9381fa8f992e182b415d6060128b567808be
Parents: c47f407
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Feb 18 15:43:00 2013 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Mar 21 09:59:02 2013 -0400

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    5 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  175 +++++++--
 src/java/org/apache/cassandra/db/DefsTable.java    |   33 +--
 src/java/org/apache/cassandra/db/SystemTable.java  |   26 +--
 src/java/org/apache/cassandra/db/Table.java        |   13 +-
 .../apache/cassandra/db/TruncateVerbHandler.java   |    2 +-
 .../db/compaction/AbstractCompactionStrategy.java  |   36 ++-
 .../cassandra/db/compaction/CompactionManager.java |  288 ++++-----------
 .../cassandra/db/compaction/CompactionTask.java    |   23 +-
 .../db/compaction/LeveledCompactionStrategy.java   |    4 +-
 .../compaction/SizeTieredCompactionStrategy.java   |   17 +-
 .../AbstractSimplePerColumnSecondaryIndex.java     |   15 +-
 .../apache/cassandra/db/index/SecondaryIndex.java  |   17 +-
 .../apache/cassandra/service/MigrationManager.java |   70 ++---
 .../apache/cassandra/service/StorageService.java   |    3 +-
 .../db/compaction/LongCompactionsTest.java         |    4 +-
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |   10 +-
 .../cassandra/db/RecoveryManagerTruncateTest.java  |    2 +-
 .../cassandra/db/SecondaryIndexColumnSizeTest.java |    4 +-
 .../cassandra/db/compaction/CompactionsTest.java   |    3 +-
 .../compaction/LeveledCompactionStrategyTest.java  |   12 +-
 22 files changed, 337 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3edf335..0f9c4f1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0
+ * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
  * Change Message IDs to ints (CASSANDRA-5307)
  * Move sstable level information into the Stats component, removing the
    need for a separate Manifest file (CASSANDRA-4872)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d4b148b..d796d39 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,11 @@ Upgrading
     - authority option in cassandra.yaml has been deprecated since 1.2.0,
       but it has been completely removed in 2.0. Please use 'authorizer' option.
 
+Operations
+----------
+    - Major compactions, cleanup, scrub, and upgradesstables will interrupt 
+      any in-progress compactions (but not repair validations) when invoked.
+
 
 1.2.3
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4de1ebd..41db48f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -28,13 +28,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import javax.management.*;
 
+import com.google.common.base.Function;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 
-import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.*;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,10 +53,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -146,17 +144,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (metadata.compactionStrategyClass.equals(compactionStrategy.getClass()) && metadata.compactionStrategyOptions.equals(compactionStrategy.options))
             return;
 
-        // TODO is there a way to avoid locking here?
-        CompactionManager.instance.getCompactionLock().lock();
-        try
+        // synchronize vs runWithCompactionsDisabled calling pause/resume.  otherwise, letting old compactions
+        // finish should be harmless and possibly useful.
+        synchronized (this)
         {
             compactionStrategy.shutdown();
             compactionStrategy = metadata.createCompactionStrategyInstance(this);
         }
-        finally
-        {
-            CompactionManager.instance.getCompactionLock().unlock();
-        }
     }
 
     void scheduleFlush()
@@ -802,9 +796,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return switchMemtable(true, false);
     }
 
-    public void forceBlockingFlush() throws ExecutionException, InterruptedException
+    public void forceBlockingFlush()
     {
-        forceFlush().get();
+        try
+        {
+            forceFlush().get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily)
@@ -1611,19 +1616,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public void snapshot(String snapshotName)
     {
-        try
-        {
-            forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
+        forceBlockingFlush();
         snapshotWithoutFlush(snapshotName);
     }
 
@@ -1752,7 +1745,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a Future to the delete operation. Call the future's get() to make
      * sure the column family has been deleted
      */
-    public Future<?> truncate() throws ExecutionException, InterruptedException
+    public void truncateBlocking()
     {
         // We have two goals here:
         // - truncate should delete everything written before truncate was invoked
@@ -1763,9 +1756,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         //
         // Bonus complication: since we store replay position in sstable metadata,
         // truncating those sstables means we will replay any CL segments from the
-        // beginning if we restart before they are discarded for normal reasons
-        // post-truncate.  So we need to create a "dummy" sstable containing
-        // only the replay position.  This is done by CompactionManager.submitTruncate.
+        // beginning if we restart before they [the CL segments] are discarded for
+        // normal reasons post-truncate.  To prevent this, we store truncation
+        // position in the System keyspace.
         logger.debug("truncating {}", name);
 
         if (DatabaseDescriptor.isAutoSnapshot())
@@ -1809,11 +1802,115 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
         }
 
-        long truncatedAt = System.currentTimeMillis();
-        if (DatabaseDescriptor.isAutoSnapshot())
-            snapshot(Table.getTimestampedSnapshotName(name));
+        Runnable truncateRunnable = new Runnable()
+        {
+            public void run()
+            {
+                logger.debug("Discarding sstable data for truncated CF + indexes");
+
+                final long truncatedAt = System.currentTimeMillis();
+                if (DatabaseDescriptor.isAutoSnapshot())
+                    snapshot(Table.getTimestampedSnapshotName(name));
+
+                ReplayPosition replayAfter = discardSSTables(truncatedAt);
+
+                for (SecondaryIndex index : indexManager.getIndexes())
+                    index.truncateBlocking(truncatedAt);
+
+                SystemTable.saveTruncationPosition(ColumnFamilyStore.this, replayAfter);
+
+                logger.debug("cleaning out row cache");
+                for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+                {
+                    if (key.cfId == metadata.cfId)
+                        CacheService.instance.rowCache.remove(key);
+                }
+            }
+        };
+
+        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
+        logger.debug("truncate complete");
+    }
+
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+    {
+        // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
+        // and so we only run one major compaction at a time
+        synchronized (this)
+        {
+            logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
+
+            Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
+            for (ColumnFamilyStore cfs : selfWithIndexes)
+                cfs.getCompactionStrategy().pause();
+            try
+            {
+                // interrupt in-progress compactions
+                Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>()
+                {
+                    public CFMetaData apply(ColumnFamilyStore cfs)
+                    {
+                        return cfs.metadata;
+                    }
+                };
+                Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f);
+                CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation);
+
+                // wait for the interruption to be recognized
+                long start = System.currentTimeMillis();
+                while (System.currentTimeMillis() < start + 60000)
+                {
+                    if (CompactionManager.instance.isCompacting(selfWithIndexes))
+                        FBUtilities.sleep(100);
+                    else
+                        break;
+                }
+
+                // doublecheck that we finished, instead of timing out
+                for (ColumnFamilyStore cfs : selfWithIndexes)
+                {
+                    if (!cfs.getDataTracker().getCompacting().isEmpty())
+                    {
+                        logger.warn("Unable to cancel in-progress compactios for {}.  Probably there is an unusually large row in progress somewhere.  It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName);
+                    }
+                }
+                logger.debug("Compactions successfully cancelled");
+
+                // run our task
+                try
+                {
+                    return callable.call();
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            finally
+            {
+                for (ColumnFamilyStore cfs : selfWithIndexes)
+                    cfs.getCompactionStrategy().resume();
+            }
+        }
+    }
+
+    public Collection<SSTableReader> markAllCompacting()
+    {
+        Callable<Collection<SSTableReader>> callable = new Callable<Collection<SSTableReader>>()
+        {
+            public Collection<SSTableReader> call() throws Exception
+            {
+                assert data.getCompacting().isEmpty() : data.getCompacting();
+                Collection<SSTableReader> sstables = AbstractCompactionStrategy.filterSuspectSSTables(getSSTables());
+                if (sstables.isEmpty())
+                    return null;
+                boolean success = data.markCompacting(sstables);
+                assert success : "something marked things compacting while compactions are disabled";
+                return sstables;
+            }
+        };
 
-        return CompactionManager.instance.submitTruncate(this, truncatedAt);
+        return runWithCompactionsDisabled(callable, false);
     }
 
     public long getBloomFilterFalsePositives()
@@ -1852,6 +1949,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public void disableAutoCompaction()
     {
+        // we don't use CompactionStrategy.pause since we don't want users flipping that on and off
+        // during runWithCompactionsDisabled
         minCompactionThreshold.set(0);
         maxCompactionThreshold.set(0);
     }
@@ -2070,7 +2169,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Discard all SSTables that were created before given timestamp. Caller is responsible to obtain compactionLock.
+     * Discard all SSTables that were created before given timestamp.
+     *
+     * Caller should first ensure that comapctions have quiesced.
      *
      * @param truncatedAt The timestamp of the truncation
      *                    (all SSTables before that timestamp are going be marked as compacted)
@@ -2079,6 +2180,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public ReplayPosition discardSSTables(long truncatedAt)
     {
+        assert data.getCompacting().isEmpty() : data.getCompacting();
+
         List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
 
         for (SSTableReader sstable : getSSTables())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 3c8f61c..8d47b7b 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -210,21 +208,9 @@ public class DefsTable
 
         logger.info("Fixing timestamps of schema ColumnFamily " + columnFamily + "...");
 
-        try
-        {
-            cfs.truncate().get();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        cfs.truncateBlocking();
 
         long microTimestamp = now.getTime() * 1000;
-
         for (Row row : rows)
         {
             if (Schema.invalidSchemaRow(row))
@@ -241,18 +227,7 @@ public class DefsTable
             mutation.apply();
         }
         // flush immediately because we read schema before replaying the commitlog
-        try
-        {
-            cfs.forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException("Could not flush after fixing schema timestamps", e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        cfs.forceBlockingFlush();
     }
 
     public static ByteBuffer searchComposite(String name, boolean start)
@@ -557,7 +532,7 @@ public class DefsTable
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
         String snapshotName = Table.getTimestampedSnapshotName(ksName);
 
-        CompactionManager.instance.stopCompactionFor(ksm.cfMetaData().values());
+        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
 
         // remove all cfs from the table instance.
         for (CFMetaData cfm : ksm.cfMetaData().values())
@@ -596,7 +571,7 @@ public class DefsTable
         Schema.instance.purge(cfm);
         Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm));
 
-        CompactionManager.instance.stopCompactionFor(Arrays.asList(cfm));
+        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 
         if (!StorageService.instance.isClientMode())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index c6a6e59..95c7298 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -172,14 +172,14 @@ public class SystemTable
             String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', %s, '%s')";
             processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name()));
 
-            oldStatusCfs.truncate();
+            oldStatusCfs.truncateBlocking();
         }
 
         ColumnFamilyStore oldHintsCfs = table.getColumnFamilyStore(OLD_HINTS_CF);
         if (oldHintsCfs.getSSTables().size() > 0)
         {
             logger.info("Possible old-format hints found. Truncating");
-            oldHintsCfs.truncate();
+            oldHintsCfs.truncateBlocking();
         }
     }
 
@@ -241,14 +241,7 @@ public class SystemTable
     public static void discardCompactionsInProgress()
     {
         ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
-        try
-        {
-            compactionLog.truncate().get();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        compactionLog.truncateBlocking();
     }
 
     public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position)
@@ -408,18 +401,7 @@ public class SystemTable
 
     private static void forceBlockingFlush(String cfname)
     {
-        try
-        {
-            Table.open(Table.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        Table.open(Table.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 7dafa4d..2366d3d 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -299,18 +299,7 @@ public class Table
     // disassociate a cfs from this table instance.
     private void unloadCf(ColumnFamilyStore cfs) throws IOException
     {
-        try
-        {
-            cfs.forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new IOException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e);
-        }
+        cfs.forceBlockingFlush();
         cfs.invalidate();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 73825c3..6ebedc7 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -37,7 +37,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation>
         try
         {
             ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
-            cfs.truncate().get();
+            cfs.truncateBlocking();
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 8668628..a6f4abe 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -53,6 +53,18 @@ public abstract class AbstractCompactionStrategy
     protected float tombstoneThreshold;
     protected long tombstoneCompactionInterval;
 
+    /**
+     * pause/resume/getNextBackgroundTask must synchronize.  This guarantees that after pause completes,
+     * no new tasks will be generated; or put another way, pause can't run until in-progress tasks are
+     * done being created.
+     *
+     * This allows runWithCompactionsDisabled to be confident that after pausing, once in-progress
+     * tasks abort, it's safe to proceed with truncate/cleanup/etc.
+     *
+     * See CASSANDRA-3430
+     */
+    protected boolean isActive = true;
+
     protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
         assert cfs != null;
@@ -78,10 +90,30 @@ public abstract class AbstractCompactionStrategy
     }
 
     /**
+     * For internal, temporary suspension of background compactions so that we can do exceptional
+     * things like truncate or major compaction
+     */
+    public synchronized void pause()
+    {
+        isActive = false;
+    }
+
+    /**
+     * For internal, temporary suspension of background compactions so that we can do exceptional
+     * things like truncate or major compaction
+     */
+    public synchronized void resume()
+    {
+        isActive = true;
+    }
+
+    /**
      * Releases any resources if this strategy is shutdown (when the CFS is reloaded after a schema change).
-     * Default is to do nothing.
      */
-    public void shutdown() { }
+    public void shutdown()
+    {
+        isActive = false;
+    }
 
     /**
      * @param gcBefore throw away tombstones older than this

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index dd4c4d4..ca0e945 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,12 +22,9 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.primitives.Longs;
@@ -35,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.AutoSavingCache;
-import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
@@ -43,9 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
@@ -53,9 +47,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.AntiEntropyService;
-import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.CounterId;
@@ -63,10 +55,8 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
- * A singleton which manages a private executor of ongoing compactions. A readwrite lock
- * controls whether compactions can proceed: an external consumer can completely stop
- * compactions by acquiring the write half of the lock via getCompactionLock().
- *
+ * A singleton which manages a private executor of ongoing compactions.
+ * <p/>
  * Scheduling for compaction is accomplished by swapping sstables to be compacted into
  * a set via DataTracker. New scheduling attempts will ignore currently compacting
  * sstables.
@@ -82,7 +72,8 @@ public class CompactionManager implements CompactionManagerMBean
 
     // A thread local that tells us if the current thread is owned by the compaction manager. Used
     // by CounterContext to figure out if it should log a warning for invalid counter shards.
-    public static final ThreadLocal<Boolean> isCompactionManager = new ThreadLocal<Boolean>() {
+    public static final ThreadLocal<Boolean> isCompactionManager = new ThreadLocal<Boolean>()
+    {
         @Override
         protected Boolean initialValue()
         {
@@ -90,17 +81,6 @@ public class CompactionManager implements CompactionManagerMBean
         }
     };
 
-    /**
-     * compactionLock has two purposes:
-     * - "Special" compactions will acquire writelock instead of readlock to make sure that all
-     * other compaction activity is quiesced and they can grab ALL the sstables to do something.
-     * - Some schema migrations cannot run concurrently with compaction.  (Currently, this is
-     *   only when changing compaction strategy -- see CFS.maybeReloadCompactionStrategy.)
-     *
-     * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS.
-     */
-    private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock();
-
     static
     {
         instance = new CompactionManager();
@@ -121,14 +101,6 @@ public class CompactionManager implements CompactionManagerMBean
     private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
 
     /**
-     * @return A lock, for which acquisition means no compactions can run.
-     */
-    public Lock getCompactionLock()
-    {
-        return compactionLock.writeLock();
-    }
-
-    /**
      * Call this whenever a compaction might be needed on the given columnfamily.
      * It's okay to over-call (within reason) since the compactions are single-threaded,
      * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
@@ -139,14 +111,14 @@ public class CompactionManager implements CompactionManagerMBean
         if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize())
         {
             logger.debug("Background compaction is still running for {}.{} ({} remaining). Skipping",
-                         new Object[] { cfs.table.getName(), cfs.name, count});
+                         cfs.table.getName(), cfs.name, count);
             return Collections.emptyList();
         }
 
         logger.debug("Scheduling a background task check for {}.{} with {}",
-                     new Object[] { cfs.table.getName(),
-                                   cfs.name,
-                                   cfs.getCompactionStrategy().getClass().getSimpleName()});
+                     cfs.table.getName(),
+                     cfs.name,
+                     cfs.getCompactionStrategy().getClass().getSimpleName());
         List<Future<?>> futures = new ArrayList<Future<?>>();
         // if we have room for more compactions, then fill up executor
         while (executor.getActiveCount() + futures.size() < executor.getMaximumPoolSize())
@@ -157,6 +129,14 @@ public class CompactionManager implements CompactionManagerMBean
         return futures;
     }
 
+    public boolean isCompacting(Iterable<ColumnFamilyStore> cfses)
+    {
+        for (ColumnFamilyStore cfs : cfses)
+            if (!cfs.getDataTracker().getCompacting().isEmpty())
+                return true;
+        return false;
+    }
+
     // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
     // are created between task submission and execution, we execute against the most up-to-date information
     class BackgroundCompactionTask implements Runnable
@@ -170,10 +150,9 @@ public class CompactionManager implements CompactionManagerMBean
 
         public void run()
         {
-            compactionLock.readLock().lock();
             try
             {
-                logger.debug("Checking {}.{}", cfs.table.getName(), cfs.name); // log after we get the lock so we can see delays from that if any
+                logger.debug("Checking {}.{}", cfs.table.getName(), cfs.name);
                 if (!cfs.isValid())
                 {
                     logger.debug("Aborting compaction for dropped CF");
@@ -192,7 +171,6 @@ public class CompactionManager implements CompactionManagerMBean
             finally
             {
                 compactingCF.remove(cfs);
-                compactionLock.readLock().unlock();
             }
             submitBackground(cfs);
         }
@@ -205,49 +183,14 @@ public class CompactionManager implements CompactionManagerMBean
 
     private void performAllSSTableOperation(final ColumnFamilyStore cfs, final AllSSTablesOperation operation) throws InterruptedException, ExecutionException
     {
+        final Collection<SSTableReader> sstables = cfs.markAllCompacting();
         Callable<Object> runnable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                compactionLock.writeLock().lock();
-                try
-                {
-                    Collection<SSTableReader> sstables;
-                    while (true)
-                    {
-                        sstables = cfs.getDataTracker().getUncompactingSSTables();
-                        if (sstables.isEmpty())
-                            return this;
-                        if (cfs.getDataTracker().markCompacting(sstables))
-                            break;
-                    }
-
-                    try
-                    {
-                        // downgrade the lock acquisition
-                        compactionLock.readLock().lock();
-                        compactionLock.writeLock().unlock();
-                        try
-                        {
-                            operation.perform(cfs, sstables);
-                        }
-                        finally
-                        {
-                            compactionLock.readLock().unlock();
-                        }
-                    }
-                    finally
-                    {
-                        cfs.getDataTracker().unmarkCompacting(sstables);
-                    }
-                    return this;
-                }
-                finally
-                {
-                    // we probably already downgraded
-                    if (compactionLock.writeLock().isHeldByCurrentThread())
-                        compactionLock.writeLock().unlock();
-                }
+                operation.perform(cfs, sstables);
+                cfs.getDataTracker().unmarkCompacting(sstables);
+                return this;
             }
         };
         executor.submit(runnable).get();
@@ -312,35 +255,17 @@ public class CompactionManager implements CompactionManagerMBean
 
     public Future<?> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
     {
+        // here we compute the task off the compaction executor, so having that present doesn't
+        // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
+        // for ourselves to finish/acknowledge cancellation before continuing.
+        final AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
         Runnable runnable = new WrappedRunnable()
         {
             protected void runMayThrow() throws IOException
             {
-                // acquire the write lock long enough to schedule all sstables
-                compactionLock.writeLock().lock();
-                try
-                {
-                    AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
-                    if (task == null)
-                        return;
-                    // downgrade the lock acquisition
-                    compactionLock.readLock().lock();
-                    compactionLock.writeLock().unlock();
-                    try
-                    {
-                        task.execute(metrics);
-                    }
-                    finally
-                    {
-                        compactionLock.readLock().unlock();
-                    }
-                }
-                finally
-                {
-                    // we probably already downgraded
-                    if (compactionLock.writeLock().isHeldByCurrentThread())
-                        compactionLock.writeLock().unlock();
-                }
+                if (task == null)
+                    return;
+                task.execute(metrics);
             }
         };
         return executor.submit(runnable);
@@ -380,40 +305,32 @@ public class CompactionManager implements CompactionManagerMBean
         {
             protected void runMayThrow() throws IOException
             {
-                compactionLock.readLock().lock();
-                try
+                // look up the sstables now that we're on the compaction executor, so we don't try to re-compact
+                // something that was already being compacted earlier.
+                Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(dataFiles.size());
+                for (Descriptor desc : dataFiles)
                 {
-                    // look up the sstables now that we're on the compaction executor, so we don't try to re-compact
-                    // something that was already being compacted earlier.
-                    Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(dataFiles.size());
-                    for (Descriptor desc : dataFiles)
-                    {
-                        // inefficient but not in a performance sensitive path
-                        SSTableReader sstable = lookupSSTable(cfs, desc);
-                        if (sstable == null)
-                        {
-                            logger.info("Will not compact {}: it is not an active sstable", desc);
-                        }
-                        else
-                        {
-                            sstables.add(sstable);
-                        }
-                    }
-
-                    if (sstables.isEmpty())
+                    // inefficient but not in a performance sensitive path
+                    SSTableReader sstable = lookupSSTable(cfs, desc);
+                    if (sstable == null)
                     {
-                        logger.info("No files to compact for user defined compaction");
+                        logger.info("Will not compact {}: it is not an active sstable", desc);
                     }
                     else
                     {
-                        AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
-                        if (task != null)
-                            task.execute(metrics);
+                        sstables.add(sstable);
                     }
                 }
-                finally
+
+                if (sstables.isEmpty())
                 {
-                    compactionLock.readLock().unlock();
+                    logger.info("No files to compact for user defined compaction");
+                }
+                else
+                {
+                    AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
+                    if (task != null)
+                        task.execute(metrics);
                 }
             }
         };
@@ -445,16 +362,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             public Object call() throws IOException
             {
-                compactionLock.readLock().lock();
-                try
-                {
-                    doValidationCompaction(cfStore, validator);
-                    return this;
-                }
-                finally
-                {
-                    compactionLock.readLock().unlock();
-                }
+                doValidationCompaction(cfStore, validator);
+                return this;
             }
         };
         return validationExecutor.submit(callable);
@@ -543,9 +452,9 @@ public class CompactionManager implements CompactionManagerMBean
             long totalkeysWritten = 0;
 
             int expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
-                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable),cfs.metadata)));
+                                                   (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable), cfs.metadata)));
             if (logger.isDebugEnabled())
-              logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+                logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
             SSTableWriter writer = null;
             SSTableReader newSstable = null;
@@ -593,12 +502,12 @@ public class CompactionManager implements CompactionManagerMBean
                                 OnDiskAtom column = row.next();
                                 if (column instanceof CounterColumn)
                                     renewer.maybeRenew((CounterColumn) column);
-                                if (column instanceof Column && cfs.indexManager.indexes((Column)column))
+                                if (column instanceof Column && cfs.indexManager.indexes((Column) column))
                                 {
                                     if (indexedColumnsInRow == null)
                                         indexedColumnsInRow = new ArrayList<Column>();
 
-                                    indexedColumnsInRow.add((Column)column);
+                                    indexedColumnsInRow.add((Column) column);
                                 }
                             }
 
@@ -645,8 +554,8 @@ public class CompactionManager implements CompactionManagerMBean
                 long dTime = System.currentTimeMillis() - startTime;
                 long startsize = sstable.onDiskLength();
                 long endsize = newSstable.onDiskLength();
-                double ratio = (double)endsize / (double)startsize;
-                logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
+                double ratio = (double) endsize / (double) startsize;
+                logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
             }
 
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
@@ -696,23 +605,12 @@ public class CompactionManager implements CompactionManagerMBean
             // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
             // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
             // 'as good as in the non-snapshot' case)
-            gcBefore = (int)(cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
+            gcBefore = (int) (cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
         }
         else
         {
             // flush first so everyone is validating data that is as similar as possible
-            try
-            {
-                StorageService.instance.forceTableFlush(cfs.table.getName(), cfs.name);
-            }
-            catch (ExecutionException e)
-            {
-                throw new IOException(e);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
+            StorageService.instance.forceTableFlush(cfs.table.getName(), cfs.name);
 
             // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
             // instead so they won't be cleaned up if they do get compacted during the validation
@@ -759,34 +657,19 @@ public class CompactionManager implements CompactionManagerMBean
         {
             public void run()
             {
-                compactionLock.readLock().lock();
+                metrics.beginCompaction(builder);
                 try
                 {
-                    metrics.beginCompaction(builder);
-                    try
-                    {
-                        builder.build();
-                    }
-                    finally
-                    {
-                        metrics.finishCompaction(builder);
-                    }
+                    builder.build();
                 }
                 finally
                 {
-                    compactionLock.readLock().unlock();
+                    metrics.finishCompaction(builder);
                 }
             }
         };
 
-        // don't submit to the executor if the compaction lock is held by the current thread. Instead return a simple
-        // future that will be immediately immediately get()ed and executed. Happens during a migration, which locks
-        // the compaction thread and then reinitializes a ColumnFamilyStore. Under normal circumstances, CFS spawns
-        // index jobs to the compaction manager (this) and blocks on them.
-        if (compactionLock.isWriteLockedByCurrentThread())
-            return new SimpleFuture(runnable);
-        else
-            return executor.submit(runnable);
+        return executor.submit(runnable);
     }
 
     public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
@@ -821,39 +704,6 @@ public class CompactionManager implements CompactionManagerMBean
         return executor.submit(runnable);
     }
 
-    public Future<?> submitTruncate(final ColumnFamilyStore main, final long truncatedAt)
-    {
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                compactionLock.writeLock().lock();
-
-                try
-                {
-                    ReplayPosition replayAfter = main.discardSSTables(truncatedAt);
-
-                    for (SecondaryIndex index : main.indexManager.getIndexes())
-                        index.truncate(truncatedAt);
-
-                    SystemTable.saveTruncationPosition(main, replayAfter);
-
-                    for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
-                    {
-                        if (key.cfId == main.metadata.cfId)
-                            CacheService.instance.rowCache.remove(key);
-                    }
-                }
-                finally
-                {
-                    compactionLock.writeLock().unlock();
-                }
-            }
-        };
-
-        return executor.submit(runnable);
-    }
-
     static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
@@ -911,7 +761,6 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static class CompactionExecutor extends ThreadPoolExecutor
     {
-
         protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
         {
             super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY));
@@ -939,7 +788,7 @@ public class CompactionManager implements CompactionManagerMBean
         @Override
         public void afterExecute(Runnable r, Throwable t)
         {
-            super.afterExecute(r,t);
+            super.afterExecute(r, t);
 
             if (t == null)
                 t = DebuggableThreadPoolExecutor.extractThrowable(r);
@@ -970,6 +819,7 @@ public class CompactionManager implements CompactionManagerMBean
     public interface CompactionExecutorStatsCollector
     {
         void beginCompaction(CompactionInfo.Holder ci);
+
         void finishCompaction(CompactionInfo.Holder ci);
     }
 
@@ -1052,6 +902,7 @@ public class CompactionManager implements CompactionManagerMBean
     {
         private final SSTableReader sstable;
         private final SSTableScanner scanner;
+
         public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
         {
             this.sstable = sstable;
@@ -1126,19 +977,26 @@ public class CompactionManager implements CompactionManagerMBean
 
     /**
      * Try to stop all of the compactions for given ColumnFamilies.
-     * Note that this method does not wait indefinitely for all compactions to finish, maximum wait time is 30 secs.
+     * 
+     * Note that this method does not wait for all compactions to finish; you'll need to loop against
+     * isCompacting if you want that behavior.
      *
      * @param columnFamilies The ColumnFamilies to try to stop compaction upon.
+     * @param interruptValidation true if validation operations for repair should also be interrupted
+     *
      */
-    public void stopCompactionFor(Collection<CFMetaData> columnFamilies)
+    public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, boolean interruptValidation)
     {
         assert columnFamilies != null;
 
+        // interrupt in-progress compactions
         for (Holder compactionHolder : CompactionMetrics.getCompactions())
         {
             CompactionInfo info = compactionHolder.getCompactionInfo();
+            if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
+                continue;
 
-            if (columnFamilies.contains(info.getCFMetaData()))
+            if (Iterables.contains(columnFamilies, info.getCFMetaData()))
                 compactionHolder.stop(); // signal compaction to stop
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 2968374..f1181d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -92,6 +92,10 @@ public class CompactionTask extends AbstractCompactionTask
         // it is not empty, it may compact down to nothing if all rows are deleted.
         assert sstables != null && sstableDirectory != null;
 
+        // Note that the current compaction strategy, is not necessarily the one this task was created under.
+        // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
 
@@ -110,7 +114,6 @@ public class CompactionTask extends AbstractCompactionTask
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
         long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact, cfs.metadata));
         long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / strategy.getMaxSSTableSize());
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
@@ -217,22 +220,24 @@ public class CompactionTask extends AbstractCompactionTask
         {
             controller.close();
 
+            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+            // (in replaceCompactedSSTables)
+            if (taskId != null)
+                SystemTable.finishCompaction(taskId);
+
+            if (collector != null)
+                collector.finishCompaction(ci);
+
             try
             {
+                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
+                // we don't end up with compaction information hanging around indefinitely in limbo.
                 iter.close();
             }
             catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
-
-            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemTable.finishCompaction(taskId);
-
-            if (collector != null)
-                collector.finishCompaction(ci);
         }
 
         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 2258078..ffe45ad 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -87,9 +87,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
      * the only difference between background and maximal in LCS is that maximal is still allowed
      * (by explicit user request) even when compaction is disabled.
      */
-    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
-        if (cfs.isCompactionDisabled())
+        if (!isActive || cfs.isCompactionDisabled())
             return null;
 
         return getMaximalTask(gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 9fb4a2b..38314ad 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.apache.cassandra.cql3.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -129,8 +131,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         });
     }
 
-    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        if (!isActive)
+            return null;
+
         while (true)
         {
             List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore);
@@ -145,14 +150,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     public AbstractCompactionTask getMaximalTask(final int gcBefore)
     {
-        while (true)
-        {
-            List<SSTableReader> sstables = filterSuspectSSTables(cfs.getUncompactingSSTables());
-            if (sstables.isEmpty())
-                return null;
-            if (cfs.getDataTracker().markCompacting(sstables))
-                return new CompactionTask(cfs, sstables, gcBefore);
-        }
+        Collection<SSTableReader> sstables = cfs.markAllCompacting();
+        return new CompactionTask(cfs, sstables, gcBefore);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 2f96ef8..d8a7333 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -132,18 +132,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void forceBlockingFlush()
     {
-        try
-        {
-            indexCfs.forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        indexCfs.forceBlockingFlush();
     }
 
     public void invalidate()
@@ -151,7 +140,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         indexCfs.invalidate();
     }
 
-    public void truncate(long truncatedAt)
+    public void truncateBlocking(long truncatedAt)
     {
         indexCfs.discardSSTables(truncatedAt);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index f78061c..11e026d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -164,7 +164,7 @@ public abstract class SecondaryIndex
      *
      * @param truncatedAt The truncation timestamp, all data before that timestamp should be rejected.
      */
-    public abstract void truncate(long truncatedAt);
+    public abstract void truncateBlocking(long truncatedAt);
 
     /**
      * Builds the index using the data in the underlying CFS
@@ -231,19 +231,8 @@ public abstract class SecondaryIndex
         {
             public void run()
             {
-                try
-                {
-                    baseCfs.forceBlockingFlush();
-                    buildIndexBlocking();
-                }
-                catch (ExecutionException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
+                baseCfs.forceBlockingFlush();
+                buildIndexBlocking();
             }
         };
         FutureTask<?> f = new FutureTask<Object>(runnable, null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 5ac8b15..23e7842 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -335,53 +335,39 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     {
         logger.info("Starting local schema reset...");
 
-        try
+        if (logger.isDebugEnabled())
+            logger.debug("Truncating schema tables...");
+
+        // truncate schema tables
+        SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncateBlocking();
+        SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncateBlocking();
+        SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncateBlocking();
+
+        if (logger.isDebugEnabled())
+            logger.debug("Clearing local schema keyspace definitions...");
+
+        Schema.instance.clear();
+
+        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
+        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+
+        // force migration is there are nodes around, first of all
+        // check if there are nodes with versions >= 1.1.7 to request migrations from,
+        // because migration format of the nodes with versions < 1.1 is incompatible with older versions
+        // and due to broken timestamps in versions prior to 1.1.7
+        for (InetAddress node : liveEndpoints)
         {
-            if (logger.isDebugEnabled())
-                logger.debug("Truncating schema tables...");
-
-            // truncate schema tables
-            FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
-            {{
-                SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncate();
-                SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncate();
-                SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
-            }});
-
-            if (logger.isDebugEnabled())
-                logger.debug("Clearing local schema keyspace definitions...");
-
-            Schema.instance.clear();
-
-            Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
-            liveEndpoints.remove(FBUtilities.getBroadcastAddress());
-
-            // force migration is there are nodes around, first of all
-            // check if there are nodes with versions >= 1.1.7 to request migrations from,
-            // because migration format of the nodes with versions < 1.1 is incompatible with older versions
-            // and due to broken timestamps in versions prior to 1.1.7
-            for (InetAddress node : liveEndpoints)
+            if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
             {
-                if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Requesting schema from " + node);
+                if (logger.isDebugEnabled())
+                    logger.debug("Requesting schema from " + node);
 
-                    FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
-                    break;
-                }
+                FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
+                break;
             }
-
-            logger.info("Local schema reset is complete.");
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
         }
+
+        logger.info("Local schema reset is complete.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 78d9c74..3e08505 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2283,8 +2283,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableFlush(final String tableName, final String... columnFamilies)
-                throws IOException, ExecutionException, InterruptedException
+    public void forceTableFlush(final String tableName, final String... columnFamilies) throws IOException
     {
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, tableName, columnFamilies))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 65a8a17..36f1e71 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -19,12 +19,10 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
@@ -148,7 +146,7 @@ public class LongCompactionsTest extends SchemaLoader
 
         // make sure max timestamp of compacted sstables is recorded properly after compaction.
         CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected);
-        cfs.truncate();
+        cfs.truncateBlocking();
     }
 
     private void forceCompactions(ColumnFamilyStore cfs) throws ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 12a39ad..d35bf82 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -80,7 +80,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     {
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
-        cfs.truncate().get();
+        cfs.truncateBlocking();
 
         RowMutation rm;
         rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
@@ -103,7 +103,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     {
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
-        cfs.truncate().get();
+        cfs.truncateBlocking();
 
         List<IMutation> rms = new LinkedList<IMutation>();
         RowMutation rm;
@@ -417,7 +417,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         Table table = Table.open(keySpace);
         ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
-        cfs.truncate().get();
+        cfs.truncateBlocking();
 
         ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
         ByteBuffer colName = ByteBufferUtil.bytes("birthdate"); 
@@ -480,7 +480,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         Table table = Table.open(keySpace);
         ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
-        cfs.truncate().get();
+        cfs.truncateBlocking();
 
         ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
         ByteBuffer clusterKey = ByteBufferUtil.bytes("ck1");
@@ -1088,7 +1088,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Indexed1");
 
-        store.truncate();
+        store.truncateBlocking();
 
         for (int i = 0; i < 10; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 51d69b1..c4f5be0 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -56,7 +56,7 @@ public class RecoveryManagerTruncateTest extends SchemaLoader
 		assertNotNull(getFromTable(table, "Standard1", "keymulti", "col1"));
 
 		// and now truncate it
-		cfs.truncate().get();
+		cfs.truncateBlocking();
         CommitLog.instance.resetUnsafe();
 		CommitLog.instance.recover();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
index 708c04f..7cac7f6 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
@@ -119,7 +119,7 @@ public class SecondaryIndexColumnSizeTest
         }
 
         @Override
-        public void truncate(long truncatedAt)
+        public void truncateBlocking(long truncatedAt)
         {
         }
 
@@ -194,7 +194,7 @@ public class SecondaryIndexColumnSizeTest
         }
 
         @Override
-        public void truncate(long truncatedAt)
+        public void truncateBlocking(long truncatedAt)
         {
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 3a1b804..8b46cfa 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -34,7 +34,6 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -452,7 +451,7 @@ public class CompactionsTest extends SchemaLoader
         }
 
 
-        cfs.truncate();
+        cfs.truncateBlocking();
         assertEquals(failures, sstablesToCorrupt);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d72e9381/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 161e7f8..f9dff11 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -40,6 +40,8 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -167,7 +169,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
             Thread.sleep(200);
         }
 
-        for(SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
+        for (SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
         {
             assertTrue(s.getSSTableLevel() != 6);
             strat.manifest.remove(s);
@@ -178,17 +180,17 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         for(SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
         {
-            assertTrue(s.getSSTableLevel() == 6);
+            assertEquals(6, s.getSSTableLevel());
         }
 
         int [] levels = strat.manifest.getAllLevelSize();
 
         for (int i =0; i < levels.length; i++)
         {
-            if (i!=6)
-                assertTrue(levels[i] == 0);
+            if (i == 6)
+                assertEquals(table.getColumnFamilyStore(cfname).getSSTables().size(), levels[i]);
             else
-                assertTrue(levels[i] == table.getColumnFamilyStore(cfname).getSSTables().size());
+                assertEquals(0, levels[i]);
         }
 
     }


Mime
View raw message