cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [5/5] cassandra git commit: Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)
Date Fri, 24 Jul 2015 16:32:14 GMT
Introduce safer durable sstable membership management
(and simplify cleanup of compaction leftovers)

Instead of using temporary files and system tables,
this patch introduces a simple transaction log for sstable
membership edits that can be committed/aborted atomically
and simply replayed on startup.

patch by stefania; reviewed by benedict for CASSANDRA-7066


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09e60f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09e60f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09e60f7

Branch: refs/heads/trunk
Commit: b09e60f72bb2f37235d9e9190c25db36371b3c18
Parents: e338d2f
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Mon Apr 27 14:38:53 2015 +0800
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Jul 24 14:41:51 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 bin/sstablelister                               |  55 ++
 bin/sstablelister.bat                           |  41 +
 build.xml                                       |   4 +-
 .../org/apache/cassandra/config/Config.java     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 149 +---
 .../org/apache/cassandra/db/Directories.java    |  57 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  31 +-
 .../org/apache/cassandra/db/SystemKeyspace.java | 126 +--
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        |  22 +-
 .../cassandra/db/compaction/CompactionTask.java |  24 +-
 .../db/compaction/LeveledCompactionTask.java    |   8 +-
 .../cassandra/db/compaction/OperationType.java  |  18 +-
 .../db/compaction/SSTableSplitter.java          |   6 +-
 .../cassandra/db/compaction/Scrubber.java       |  18 +-
 .../SizeTieredCompactionStrategy.java           |   6 +-
 .../cassandra/db/compaction/Upgrader.java       |  13 +-
 .../writers/CompactionAwareWriter.java          |  10 +-
 .../writers/DefaultCompactionWriter.java        |  11 +-
 .../writers/MajorLeveledCompactionWriter.java   |  15 +-
 .../writers/MaxSSTableSizeWriter.java           |  25 +-
 .../SplittingSizeTieredCompactionWriter.java    |  39 +-
 .../apache/cassandra/db/lifecycle/Helpers.java  |  58 +-
 .../db/lifecycle/LifecycleTransaction.java      | 150 +++-
 .../apache/cassandra/db/lifecycle/Tracker.java  |  50 +-
 .../cassandra/db/lifecycle/TransactionLogs.java | 786 +++++++++++++++++++
 .../io/sstable/AbstractSSTableSimpleWriter.java |   8 +-
 .../apache/cassandra/io/sstable/Descriptor.java | 107 ++-
 .../io/sstable/IndexSummaryManager.java         |   7 +-
 .../apache/cassandra/io/sstable/SSTable.java    |  17 +-
 .../io/sstable/SSTableDeletingTask.java         | 130 ---
 .../cassandra/io/sstable/SSTableLoader.java     |  22 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  20 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   7 +-
 .../io/sstable/SSTableSimpleWriter.java         |   5 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  | 101 +++
 .../io/sstable/format/SSTableReader.java        | 171 ++--
 .../io/sstable/format/SSTableWriter.java        |  50 +-
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../io/sstable/format/big/BigTableWriter.java   |  69 +-
 .../io/sstable/metadata/MetadataSerializer.java |   7 +-
 .../org/apache/cassandra/io/util/FileUtils.java |  65 ++
 .../cassandra/io/util/SequentialWriter.java     |  12 +-
 .../cassandra/service/CassandraDaemon.java      |  16 +-
 .../apache/cassandra/service/GCInspector.java   |  10 +-
 .../apache/cassandra/service/StartupChecks.java |   6 +-
 .../cassandra/service/StorageService.java       |   4 +-
 .../cassandra/streaming/StreamLockfile.java     | 128 ---
 .../cassandra/streaming/StreamReader.java       |   4 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  20 +-
 .../cassandra/streaming/StreamSession.java      |   7 +
 .../cassandra/tools/StandaloneLister.java       | 214 +++++
 .../cassandra/tools/StandaloneScrubber.java     |   5 +-
 .../cassandra/tools/StandaloneSplitter.java     |  10 +-
 .../cassandra/tools/StandaloneUpgrader.java     |  19 +-
 .../utils/concurrent/Transactional.java         |  25 +-
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 93 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 16 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 54 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4442 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 80 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../tmp-la-2-big-Data.db                        | Bin 0 -> 93 bytes
 .../tmp-la-2-big-Index.db                       | Bin 0 -> 54 bytes
 .../tmplink-la-2-big-Data.db                    | Bin 0 -> 93 bytes
 .../tmplink-la-2-big-Index.db                   | Bin 0 -> 54 bytes
 .../manifest.json                               |   1 +
 .../manifest.json                               |   1 +
 .../manifest.json                               |   1 +
 .../io/sstable/CQLSSTableWriterLongTest.java    |   5 +-
 test/unit/org/apache/cassandra/MockSchema.java  |  12 +-
 test/unit/org/apache/cassandra/Util.java        |   3 -
 .../org/apache/cassandra/cql3/CQLTester.java    |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 150 +---
 .../apache/cassandra/db/DirectoriesTest.java    |  24 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   7 +-
 .../apache/cassandra/db/SystemKeyspaceTest.java |  48 ++
 .../db/compaction/AntiCompactionTest.java       |   4 +-
 .../compaction/CompactionAwareWriterTest.java   |   8 +-
 .../db/compaction/CompactionsTest.java          |  69 +-
 .../LeveledCompactionStrategyTest.java          |   5 +-
 .../cassandra/db/lifecycle/HelpersTest.java     |  25 +-
 .../db/lifecycle/LifecycleTransactionTest.java  |   9 +-
 .../db/lifecycle/RealTransactionsTest.java      | 228 ++++++
 .../cassandra/db/lifecycle/TrackerTest.java     |  40 +-
 .../db/lifecycle/TransactionLogsTest.java       | 558 +++++++++++++
 .../io/sstable/BigTableWriterTest.java          |  27 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   9 +
 .../cassandra/io/sstable/DescriptorTest.java    |  18 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java | 123 ++-
 .../io/sstable/SSTableRewriterTest.java         | 173 ++--
 .../cassandra/io/sstable/SSTableUtils.java      |  11 +-
 .../metadata/MetadataSerializerTest.java        |   2 +-
 .../org/apache/cassandra/schema/DefsTest.java   |   4 +-
 .../streaming/StreamTransferTaskTest.java       |   3 +-
 .../concurrent/AbstractTransactionalTest.java   |  31 +-
 101 files changed, 3265 insertions(+), 1357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70b26f5..a0dadc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
 3.0
+<<<<<<< HEAD
  * Implement proper sandboxing for UDFs (CASSANDRA-9402)
+=======
+ * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
+>>>>>>> Introduce safer durable sstable membership management
  * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
  * Metrics should use up to date nomenclature (CASSANDRA-9448)
  * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister
----------------------------------------------------------------------
diff --git a/bin/sstablelister b/bin/sstablelister
new file mode 100755
index 0000000..a79409d
--- /dev/null
+++ b/bin/sstablelister
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   "`dirname "$0"`/cassandra.in.sh"; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+        org.apache.cassandra.tools.StandaloneLister "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister.bat
----------------------------------------------------------------------
diff --git a/bin/sstablelister.bat b/bin/sstablelister.bat
new file mode 100644
index 0000000..cb50a08
--- /dev/null
+++ b/bin/sstablelister.bat
@@ -0,0 +1,41 @@
+@REM
+@REM  Licensed to the Apache Software Foundation (ASF) under one or more
+@REM  contributor license agreements.  See the NOTICE file distributed with
+@REM  this work for additional information regarding copyright ownership.
+@REM  The ASF licenses this file to You under the Apache License, Version 2.0
+@REM  (the "License"); you may not use this file except in compliance with
+@REM  the License.  You may obtain a copy of the License at
+@REM
+@REM      http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM  Unless required by applicable law or agreed to in writing, software
+@REM  distributed under the License is distributed on an "AS IS" BASIS,
+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM  See the License for the specific language governing permissions and
+@REM  limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneLister
+if NOT DEFINED JAVA_HOME goto :err
+
+REM ***** JAVA options *****
+set JAVA_OPTS=^
+ -Dlogback.configurationFile=logback-tools.xml
+
+set TOOLS_PARAMS=
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %*
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+:finally
+
+ENDLOCAL

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 5a82d64..e6581ea 100644
--- a/build.xml
+++ b/build.xml
@@ -1214,7 +1214,7 @@
         <jvmarg value="-Xss256k"/>
         <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
-	    <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
+        <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
         <jvmarg value="-Dcassandra.test.sstableformatdevelopment=true"/>
         <jvmarg value="-Dcassandra.testtag=@{testtag}"/> 
         <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" />
@@ -1882,7 +1882,7 @@
           <option name="MAIN_CLASS_NAME" value="" />
           <option name="METHOD_NAME" value="" />
           <option name="TEST_OBJECT" value="class" />
-          <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -ea" />
+          <option name="VM_PARAMETERS" value="-Dcassandra.debugrefcount=true -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Dcorrupt-sstable-root=$PROJECT_DIR$/test/data/corrupt-sstables -Dmigration-sstable-root=${test.data}/migration-sstables -ea" />
           <option name="PARAMETERS" value="" />
           <option name="WORKING_DIRECTORY" value="" />
           <option name="ENV_VARIABLES" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2a2062e..fe6752f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,7 +154,7 @@ public class Config
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
     public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
 
-    public String[] data_file_directories;
+    public String[] data_file_directories = new String[0];
 
     public String saved_caches_directory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 01b1633..a25af65 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -490,7 +490,7 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("saved_caches_directory is missing and -Dcassandra.storagedir is not set", false);
             conf.saved_caches_directory += File.separator + "saved_caches";
         }
-        if (conf.data_file_directories == null)
+        if (conf.data_file_directories == null || conf.data_file_directories.length == 0)
         {
             String defaultDataDir = System.getProperty("cassandra.storagedir", null);
             if (defaultDataDir == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d3ad4e6..8d14120 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -60,18 +60,14 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.*;
-import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
-import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.*;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
@@ -522,45 +518,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         Directories directories = new Directories(metadata);
 
-        // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
+         // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
         clearEphemeralSnapshots(directories);
 
-        // remove any left-behind SSTables from failed/stalled streaming
-        FileFilter filter = new FileFilter()
-        {
-            public boolean accept(File pathname)
-            {
-                return pathname.getPath().endsWith(StreamLockfile.FILE_EXT);
-            }
-        };
-        for (File dir : directories.getCFDirectories())
-        {
-            File[] lockfiles = dir.listFiles(filter);
-            // lock files can be null if I/O error happens
-            if (lockfiles == null || lockfiles.length == 0)
-                continue;
-            logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length);
-
-            for (File lockfile : lockfiles)
-            {
-                StreamLockfile streamLockfile = new StreamLockfile(lockfile);
-                streamLockfile.cleanup();
-                streamLockfile.delete();
-            }
-        }
-
-        logger.debug("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", metadata.cfName);
+        logger.debug("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
+        LifecycleTransaction.removeUnfinishedLeftovers(metadata);
 
+        logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
 
-            if (desc.type.isTemporary)
-            {
-                SSTable.delete(desc, components);
-                continue;
-            }
+            for (File tmpFile : desc.getTemporaryFiles())
+                tmpFile.delete();
 
             File dataFile = new File(desc.filenameFor(Component.DATA));
             if (components.contains(Component.DATA) && dataFile.length() > 0)
@@ -571,7 +542,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.warn("Removing orphans for {}: {}", desc, components);
             for (Component component : components)
             {
-                FileUtils.deleteWithConfirm(desc.filenameFor(component));
+                File file = new File(desc.filenameFor(component));
+                if (file.exists())
+                    FileUtils.deleteWithConfirm(desc.filenameFor(component));
             }
         }
 
@@ -600,91 +573,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    /**
-     * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the
-     * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then
-     * their ancestors are removed.
-     *
-     * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their
-     * ancestors "live" in the system.  This is harmless for normal data, but for counters it can cause overcounts.
-     *
-     * To prevent this, we record sstables being compacted in the system keyspace.  If we find unfinished
-     * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
-     * sstables from any given ancestor).
-     */
-    public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions)
-    {
-        Directories directories = new Directories(metadata);
-
-        Set<Integer> allGenerations = new HashSet<>();
-        for (Descriptor desc : directories.sstableLister().list().keySet())
-            allGenerations.add(desc.generation);
-
-        // sanity-check unfinishedCompactions
-        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
-        if (!allGenerations.containsAll(unfinishedGenerations))
-        {
-            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
-            missingGenerations.removeAll(allGenerations);
-            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
-                         metadata.ksName, metadata.cfName, missingGenerations);
-        }
-
-        // remove new sstables from compactions that didn't complete, and compute
-        // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<>();
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().skipTemporary(true).list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-
-            Set<Integer> ancestors;
-            try
-            {
-                CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
-                ancestors = compactionMetadata.ancestors;
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, desc.filenameFor(Component.STATS));
-            }
-            catch (NullPointerException e)
-            {
-                throw new FSReadError(e, "Failed to remove unfinished compaction leftovers (file: " + desc.filenameFor(Component.STATS) + ").  See log for details.");
-            }
-
-            if (!ancestors.isEmpty()
-                && unfinishedGenerations.containsAll(ancestors)
-                && allGenerations.containsAll(ancestors))
-            {
-                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
-                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
-                assert compactionTaskID != null;
-                logger.debug("Going to delete unfinished compaction product {}", desc);
-                SSTable.delete(desc, sstableFiles.getValue());
-                SystemKeyspace.finishCompaction(compactionTaskID);
-            }
-            else
-            {
-                completedAncestors.addAll(ancestors);
-            }
-        }
-
-        // remove old sstables from compactions that did complete
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-            if (completedAncestors.contains(desc.generation))
-            {
-                // if any of the ancestors were participating in a compaction, finish that compaction
-                logger.debug("Going to delete leftover compaction ancestor {}", desc);
-                SSTable.delete(desc, sstableFiles.getValue());
-                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
-                if (compactionTaskID != null)
-                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
-            }
-        }
-    }
-
     // must be called after all sstables are loaded since row cache merges all row versions
     public void initRowCache()
     {
@@ -750,8 +638,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             if (currentDescriptors.contains(descriptor))
                 continue; // old (initialized) SSTable found, skipping
-            if (descriptor.type.isTemporary) // in the process of being written
-                continue;
 
             if (!descriptor.isCompatible())
                 throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
@@ -780,7 +666,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                descriptor.ksname,
                                                descriptor.cfname,
                                                fileIndexGenerator.incrementAndGet(),
-                                               Descriptor.Type.FINAL,
                                                descriptor.formatType);
             }
             while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
@@ -851,24 +736,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return name;
     }
 
-    public String getTempSSTablePath(File directory)
+    public String getSSTablePath(File directory)
     {
-        return getTempSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
+        return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
     }
 
-    public String getTempSSTablePath(File directory, SSTableFormat.Type format)
+    public String getSSTablePath(File directory, SSTableFormat.Type format)
     {
-        return getTempSSTablePath(directory, format.info.getLatestVersion(), format);
+        return getSSTablePath(directory, format.info.getLatestVersion(), format);
     }
 
-    private String getTempSSTablePath(File directory, Version version, SSTableFormat.Type format)
+    private String getSSTablePath(File directory, Version version, SSTableFormat.Type format)
     {
         Descriptor desc = new Descriptor(version,
                                          directory,
                                          keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
-                                         Descriptor.Type.TEMP,
                                          format);
         return desc.filenameFor(Component.DATA);
     }
@@ -1883,11 +1767,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return directories.getSnapshotDetails();
     }
 
-    public boolean hasUnreclaimedSpace()
-    {
-        return metric.liveDiskSpaceUsed.getCount() < metric.totalDiskSpaceUsed.getCount();
-    }
-
     /**
      * @return the cached partition for @param key if it is already present in the cache.
      * Not that this will not readAndCache the parition if it is not present, nor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8b61c68..bede4c4 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
@@ -91,6 +92,7 @@ public class Directories
 
     public static final String BACKUPS_SUBDIR = "backups";
     public static final String SNAPSHOT_SUBDIR = "snapshots";
+    public static final String TRANSACTIONS_SUBDIR = "transactions";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
     public static final DataDirectory[] dataDirectories;
@@ -466,6 +468,35 @@ public class Directories
         }
     }
 
+    public static File getTransactionsDirectory(File folder)
+    {
+        return getOrCreate(folder, TRANSACTIONS_SUBDIR);
+    }
+
+    public List<File> getExistingDirectories(String subFolder)
+    {
+        List<File> ret = new ArrayList<>();
+        for (File dir : dataPaths)
+        {
+            File subDir = getExistingDirectory(dir, subFolder);
+            if (subDir != null)
+                ret.add(subDir);
+
+        }
+        return ret;
+    }
+
+    public static File getExistingDirectory(File folder, String subFolder)
+    {
+        File subDir = new File(folder, join(subFolder));
+        if (subDir.exists())
+        {
+            assert(subDir.isDirectory());
+            return subDir;
+        }
+        return null;
+    }
+
     public SSTableLister sstableLister()
     {
         return new SSTableLister();
@@ -521,6 +552,7 @@ public class Directories
     public class SSTableLister
     {
         private boolean skipTemporary;
+        private boolean onlyTemporary;
         private boolean includeBackups;
         private boolean onlyBackups;
         private int nbFiles;
@@ -536,6 +568,14 @@ public class Directories
             return this;
         }
 
+        public SSTableLister onlyTemporary(boolean b)
+        {
+            if (filtered)
+                throw new IllegalStateException("list() has already been called");
+            onlyTemporary = b;
+            return this;
+        }
+
         public SSTableLister includeBackups(boolean b)
         {
             if (filtered)
@@ -593,21 +633,25 @@ public class Directories
 
                 if (snapshotName != null)
                 {
-                    getSnapshotDirectory(location, snapshotName).listFiles(getFilter());
+                    getSnapshotDirectory(location, snapshotName).listFiles(getFilter(location));
                     continue;
                 }
 
                 if (!onlyBackups)
-                    location.listFiles(getFilter());
+                    location.listFiles(getFilter(location));
 
                 if (includeBackups)
-                    getBackupsDirectory(location).listFiles(getFilter());
+                    getBackupsDirectory(location).listFiles(getFilter(location));
             }
             filtered = true;
         }
 
-        private FileFilter getFilter()
+        private FileFilter getFilter(File location)
         {
+           final Set<File> temporaryFiles = skipTemporary || onlyTemporary
+                                            ? LifecycleTransaction.getTemporaryFiles(metadata, location)
+                                            : Collections.<File>emptySet();
+
             return new FileFilter()
             {
                 // This function always return false since accepts adds to the components map
@@ -624,7 +668,10 @@ public class Directories
                     if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
                         return false;
 
-                    if (skipTemporary && pair.left.type.isTemporary)
+                    if (skipTemporary && temporaryFiles.contains(file))
+                        return false;
+
+                    if (onlyTemporary && !temporaryFiles.contains(file))
                         return false;
 
                     Set<Component> previous = components.get(pair.left);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 71e03d5..ecaf063 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -367,8 +370,7 @@ public class Memtable implements Comparable<Memtable>
             logger.info("Writing {}", Memtable.this.toString());
 
             SSTableReader ssTable;
-            // errors when creating the writer that may leave empty temp files.
-            try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+            try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
             {
                 boolean trackContention = logger.isDebugEnabled();
                 int heavilyContendedRowCount = 0;
@@ -400,10 +402,10 @@ public class Memtable implements Comparable<Memtable>
                 {
                     logger.info(String.format("Completed flushing %s (%s) for commitlog position %s",
                                               writer.getFilename(),
-                                              FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
+                                              FBUtilities.prettyPrintMemory(writer.getFilePointer()),
                                               context));
 
-                    // temp sstables should contain non-repaired data.
+                    // sstables should contain non-repaired data.
                     ssTable = writer.finish(true);
                 }
                 else
@@ -421,18 +423,23 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        public SSTableWriter createFlushWriter(String filename,
+        public SSTableTxnWriter createFlushWriter(String filename,
                                                PartitionColumns columns,
                                                EncodingStats stats)
         {
+            // we operate "offline" here, as we expose the resulting reader consciously when done
+            // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
+            LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-            return SSTableWriter.create(Descriptor.fromFilename(filename),
-                                        (long)partitions.size(),
-                                        ActiveRepairService.UNREPAIRED_SSTABLE,
-                                        cfs.metadata,
-                                        cfs.partitioner,
-                                        sstableMetadataCollector,
-                                        new SerializationHeader(cfs.metadata, columns, stats));
+            return new SSTableTxnWriter(txn,
+                                        SSTableWriter.create(Descriptor.fromFilename(filename),
+                                                             (long)partitions.size(),
+                                                             ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                             cfs.metadata,
+                                                             cfs.partitioner,
+                                                             sstableMetadataCollector,
+                                                             new SerializationHeader(cfs.metadata, columns, stats),
+                                                             txn));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0957af6..6a4a847 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
-import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.io.ByteStreams;
 
@@ -47,9 +46,10 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;
@@ -97,7 +97,6 @@ public final class SystemKeyspace
     public static final String PEERS = "peers";
     public static final String PEER_EVENTS = "peer_events";
     public static final String RANGE_XFERS = "range_xfers";
-    public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress";
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
@@ -216,16 +215,6 @@ public final class SystemKeyspace
                 + "requested_at timestamp,"
                 + "PRIMARY KEY ((token_bytes)))");
 
-    private static final CFMetaData CompactionsInProgress =
-        compile(COMPACTIONS_IN_PROGRESS,
-                "unfinished compactions",
-                "CREATE TABLE %s ("
-                + "id uuid,"
-                + "columnfamily_name text,"
-                + "inputs set<int>,"
-                + "keyspace_name text,"
-                + "PRIMARY KEY ((id)))");
-
     private static final CFMetaData CompactionHistory =
         compile(COMPACTION_HISTORY,
                 "week-long compaction history",
@@ -408,7 +397,6 @@ public final class SystemKeyspace
                          Peers,
                          PeerEvents,
                          RangeXfers,
-                         CompactionsInProgress,
                          CompactionHistory,
                          SSTableActivity,
                          SizeEstimates,
@@ -485,81 +473,6 @@ public final class SystemKeyspace
                             FBUtilities.getLocalAddress());
     }
 
-    /**
-     * Write compaction log, except columfamilies under system keyspace.
-     *
-     * @param cfs cfs to compact
-     * @param toCompact sstables to compact
-     * @return compaction task id or null if cfs is under system keyspace
-     */
-    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
-    {
-        if (Schema.isSystemKeyspace(cfs.keyspace.getName()))
-            return null;
-
-        UUID compactionId = UUIDGen.getTimeUUID();
-        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        });
-        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
-        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
-        return compactionId;
-    }
-
-    /**
-     * Deletes the entry for this compaction from the set of compactions in progress.  The compaction does not need
-     * to complete successfully for this to be called.
-     * @param taskId what was returned from {@code startCompaction}
-     */
-    public static void finishCompaction(UUID taskId)
-    {
-        assert taskId != null;
-
-        executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
-        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
-    }
-
-    /**
-     * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
-     * task ID of the compaction they were participating in.
-     */
-    public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
-    {
-        String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
-
-        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
-        for (UntypedResultSet.Row row : resultSet)
-        {
-            String keyspace = row.getString("keyspace_name");
-            String columnfamily = row.getString("columnfamily_name");
-            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
-            UUID taskID = row.getUUID("id");
-
-            Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
-            Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
-            if (generationToTaskID == null)
-                generationToTaskID = new HashMap<>(inputs.size());
-
-            for (Integer generation : inputs)
-                generationToTaskID.put(generation, taskID);
-
-            unfinishedCompactions.put(kscf, generationToTaskID);
-        }
-        return unfinishedCompactions;
-    }
-
-    public static void discardCompactionsInProgress()
-    {
-        ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
-        compactionLog.truncateBlocking();
-    }
-
     public static void updateCompactionHistory(String ksname,
                                                String cfname,
                                                long compactedAt,
@@ -1227,7 +1140,7 @@ public final class SystemKeyspace
      *
      * @throws IOException
      */
-    public static void snapshotOnVersionChange() throws IOException
+    public static boolean snapshotOnVersionChange() throws IOException
     {
         String previous = getPreviousVersionString();
         String next = FBUtilities.getReleaseVersionString();
@@ -1242,7 +1155,10 @@ public final class SystemKeyspace
                                                                                     next));
             Keyspace systemKs = Keyspace.open(SystemKeyspace.NAME);
             systemKs.snapshot(snapshotName, null);
+            return true;
         }
+
+        return false;
     }
 
     /**
@@ -1282,6 +1198,36 @@ public final class SystemKeyspace
         return result.one().getString("release_version");
     }
 
+    /**
+     * Check data directories for old files that can be removed when migrating from 2.2 to 3.0,
+     * these checks can be removed in 4.0, see CASSANDRA-7066
+     */
+    public static void migrateDataDirs()
+    {
+        Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
+        for (String dataDir : dirs)
+        {
+            logger.debug("Checking directory {} for old files", dataDir);
+            File dir = new File(dataDir);
+            assert dir.exists() : dir + " should have been created by startup checks";
+
+            for (File ksdir : dir.listFiles((d, n) -> d.isDirectory()))
+            {
+                for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory()))
+                {
+                    if (Descriptor.isLegacyFile(cfdir.getName()))
+                    {
+                        FileUtils.deleteRecursive(cfdir);
+                    }
+                    else
+                    {
+                        FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n)));
+                    }
+                }
+            }
+        }
+    }
+
     private static ByteBuffer rangeToBytes(Range<Token> range)
     {
         try (DataOutputBuffer out = new DataOutputBuffer())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 303de15..df3bc4e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -63,7 +63,7 @@ public class CompactionController implements AutoCloseable
         refreshOverlaps();
     }
 
-    void maybeRefreshOverlaps()
+    public void maybeRefreshOverlaps()
     {
         for (SSTableReader reader : overlappingSSTables)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6cf2e18..616c310 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -823,7 +823,7 @@ public class CompactionManager implements CompactionManagerMBean
              CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
-            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn));
 
             while (ci.hasNext())
             {
@@ -948,24 +948,27 @@ public class CompactionManager implements CompactionManagerMBean
                                              File compactionFileLocation,
                                              int expectedBloomFilterSize,
                                              long repairedAt,
-                                             SSTableReader sstable)
+                                             SSTableReader sstable,
+                                             LifecycleTransaction txn)
     {
         FileUtils.createDirectory(compactionFileLocation);
 
         return SSTableWriter.create(cfs.metadata,
-                                    Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
+                                    Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
                                     expectedBloomFilterSize,
                                     repairedAt,
                                     sstable.getSSTableLevel(),
                                     cfs.partitioner,
-                                    sstable.header);
+                                    sstable.header,
+                                    txn);
     }
 
     public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
                                                               File compactionFileLocation,
                                                               int expectedBloomFilterSize,
                                                               long repairedAt,
-                                                              Collection<SSTableReader> sstables)
+                                                              Collection<SSTableReader> sstables,
+                                                              LifecycleTransaction txn)
     {
         FileUtils.createDirectory(compactionFileLocation);
         int minLevel = Integer.MAX_VALUE;
@@ -983,13 +986,14 @@ public class CompactionManager implements CompactionManagerMBean
                 break;
             }
         }
-        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
+        return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
                                     (long) expectedBloomFilterSize,
                                     repairedAt,
                                     cfs.metadata,
                                     cfs.partitioner,
                                     new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
-                                    SerializationHeader.make(cfs.metadata, sstables));
+                                    SerializationHeader.make(cfs.metadata, sstables),
+                                    txn);
     }
 
 
@@ -1198,8 +1202,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
-            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
-            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
+            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup));
+            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup));
 
             while (ci.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 6335834..7897a1a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 public class CompactionTask extends AbstractCompactionTask
@@ -128,7 +126,7 @@ public class CompactionTask extends AbstractCompactionTask
             }
         });
 
-        UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals());
+        UUID taskId = transaction.opId();
 
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
@@ -139,8 +137,8 @@ public class CompactionTask extends AbstractCompactionTask
             ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
         }
         ssTableLoggerMsg.append("]");
-        String taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
-        logger.info("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
+
+        logger.info("Compacting ({}) {}", taskId, ssTableLoggerMsg);
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
@@ -186,16 +184,11 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
 
-                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    // point of no return
                     newSStables = writer.finish();
                 }
                 finally
                 {
-                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-                    // (in replaceCompactedSSTables)
-                    if (taskId != null)
-                        SystemKeyspace.finishCompaction(taskId);
-
                     if (collector != null)
                         collector.finishCompaction(ci);
 
@@ -217,7 +210,7 @@ public class CompactionTask extends AbstractCompactionTask
             long totalSourceRows = 0;
             String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
             logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                      taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
+                                      taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
             logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
 
@@ -227,10 +220,11 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          LifecycleTransaction transaction,
+                                                          Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType);
-
+        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline);
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 1c3b686..d3d56ac 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -41,11 +41,13 @@ public class LeveledCompactionTask extends CompactionTask
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          LifecycleTransaction txn,
+                                                          Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
-        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
+            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false);
+        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index a14f13f..5b6ce05 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -32,13 +32,27 @@ public enum OperationType
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unknown compaction type"),
     ANTICOMPACTION("Anticompaction after repair"),
-    VERIFY("Verify");
+    VERIFY("Verify"),
+    FLUSH("Flush"),
+    STREAM("Stream"),
+    WRITE("Write");
 
-    private final String type;
+    public final String type;
+    public final String fileName;
 
     OperationType(String type)
     {
         this.type = type;
+        this.fileName = type.toLowerCase().replace(" ", "");
+    }
+
+    public static OperationType fromFileName(String fileName)
+    {
+        for (OperationType opType : OperationType.values())
+            if (opType.fileName.equals(fileName))
+                return opType;
+
+        throw new IllegalArgumentException("Invalid fileName for operation type: " + fileName);
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index e9a4f05..8f382ea 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -74,9 +74,11 @@ public class SSTableSplitter {
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
+            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 94f3af7..c853157 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -59,6 +59,7 @@ public class Scrubber implements Closeable
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 
     private final boolean isOffline;
+    private final boolean keepOriginals;
 
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
@@ -85,11 +86,17 @@ public class Scrubber implements Closeable
 
     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
     {
-        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
+        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData, false);
     }
 
     @SuppressWarnings("resource")
-    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
+    public Scrubber(ColumnFamilyStore cfs,
+                    LifecycleTransaction transaction,
+                    boolean skipCorrupted,
+                    OutputHandler outputHandler,
+                    boolean isOffline,
+                    boolean checkData,
+                    boolean keepOriginals) throws IOException
     {
         this.cfs = cfs;
         this.transaction = transaction;
@@ -97,6 +104,7 @@ public class Scrubber implements Closeable
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
+        this.keepOriginals = keepOriginals;
         this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
                                                                                                         sstable.descriptor.version,
                                                                                                         sstable.header);
@@ -149,7 +157,7 @@ public class Scrubber implements Closeable
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         int nowInSec = FBUtilities.nowInSeconds();
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline))
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline).keepOriginals(isOffline))
         {
             nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
             if (indexAvailable())
@@ -159,7 +167,7 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, transaction));
 
             DecoratedKey prevKey = null;
 
@@ -291,7 +299,7 @@ public class Scrubber implements Closeable
             {
                 // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);)
+                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable, transaction))
                 {
                     for (Partition partition : outOfOrder)
                         inOrderWriter.append(partition.unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 74a9757..0ece341 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -347,9 +347,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
         {
-            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType);
+            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index e3764c8..be0dd2a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -43,7 +43,6 @@ public class Upgrader
     private final LifecycleTransaction transaction;
     private final File directory;
 
-    private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
     private final CompactionController controller;
     private final CompactionStrategyManager strategyManager;
     private final long estimatedRows;
@@ -80,23 +79,23 @@ public class Upgrader
                 sstableMetadataCollector.addAncestor(i);
         }
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
-        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)),
+        return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)),
                                     estimatedRows,
                                     repairedAt,
                                     cfs.metadata,
                                     cfs.partitioner,
                                     sstableMetadataCollector,
-                                    SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)));
+                                    SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
+                                    transaction);
     }
 
-    public void upgrade()
+    public void upgrade(boolean keepOriginals)
     {
         outputHandler.output("Upgrading " + sstable);
-
         int nowInSec = FBUtilities.nowInSeconds();
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true).keepOriginals(keepOriginals);
              AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
-             CompactionIterator iter = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
+             CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 610592f..f8c73d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -42,16 +42,22 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long estimatedTotalKeys;
     protected final long maxAge;
     protected final long minRepairedAt;
+
+    protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
 
-    public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
+    public CompactionAwareWriter(ColumnFamilyStore cfs,
+                                 LifecycleTransaction txn,
+                                 Set<SSTableReader> nonExpiredSSTables,
+                                 boolean offline)
     {
         this.cfs = cfs;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline);
+        this.txn = txn;
+        this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(offline);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8fc7bec..cdacddc 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -27,14 +27,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
-
 /**
  * The default compaction writer - creates one output file in L0
  */
@@ -43,20 +41,21 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
-        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 5328fa5..ad58967 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.LeveledManifest;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -49,12 +48,12 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private final boolean skipAncestors;
 
     @SuppressWarnings("resource")
-    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
+    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         this.maxSSTableSize = maxSSTableSize;
         this.allSSTables = txn.originals();
-        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
+        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@@ -64,13 +63,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
             logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory");
 
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     keysPerSSTable,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 
@@ -92,13 +92,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 
             averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                         averageEstimatedKeysPerSSTable,
                                                         minRepairedAt,
                                                         cfs.metadata,
                                                         cfs.partitioner,
                                                         new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                        txn);
             sstableWriter.switchWriter(writer);
             partitionsWritten = 0;
             sstablesWritten++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 4832fd5..9902357 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -41,25 +40,26 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
 
     @SuppressWarnings("resource")
-    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
+    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
-        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 
@@ -71,13 +71,14 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         {
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
             @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
-                                                                estimatedTotalKeys / estimatedSSTables,
-                                                                minRepairedAt,
-                                                                cfs.metadata,
-                                                                cfs.partitioner,
-                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                                SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                        estimatedTotalKeys / estimatedSSTables,
+                                                        minRepairedAt,
+                                                        cfs.metadata,
+                                                        cfs.partitioner,
+                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                        txn);
 
             sstableWriter.switchWriter(writer);
         }


Mime
View raw message