cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [2/2] cassandra git commit: Give compaction strategies more control over sstable creation
Date Thu, 20 Aug 2015 18:53:02 GMT
Give compaction strategies more control over sstable creation

Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-8671


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed27277
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed27277
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed27277

Branch: refs/heads/cassandra-3.0
Commit: 9ed2727739c73d64086d09a86a407a77390f081a
Parents: 0d86645
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Thu Aug 6 10:19:55 2015 -0700
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Thu Aug 20 20:47:40 2015 +0200

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  73 +++++++++---
 .../org/apache/cassandra/db/Directories.java    |  42 +++++--
 src/java/org/apache/cassandra/db/Keyspace.java  |   5 +
 src/java/org/apache/cassandra/db/Memtable.java  |  32 +++--
 .../compaction/AbstractCompactionStrategy.java  |  24 +++-
 .../db/compaction/AbstractCompactionTask.java   |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../compaction/CompactionStrategyManager.java   |  40 +++++--
 .../cassandra/db/compaction/CompactionTask.java |  22 ++--
 .../db/compaction/LeveledCompactionTask.java    |   6 +-
 .../db/compaction/SSTableSplitter.java          |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |   3 +-
 .../SizeTieredCompactionStrategy.java           |   4 +-
 .../writers/CompactionAwareWriter.java          |  53 ++++++---
 .../writers/DefaultCompactionWriter.java        |  32 ++---
 .../writers/MajorLeveledCompactionWriter.java   |  46 ++++----
 .../writers/MaxSSTableSizeWriter.java           |  45 ++++---
 .../SplittingSizeTieredCompactionWriter.java    |  52 ++++-----
 .../db/lifecycle/LifecycleTransaction.java      |   9 ++
 .../apache/cassandra/db/lifecycle/Tracker.java  |  34 +++---
 .../org/apache/cassandra/db/lifecycle/View.java |   4 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  11 +-
 .../io/sstable/SSTableMultiWriter.java          |  54 +++++++++
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  43 +++++--
 .../io/sstable/SimpleSSTableMultiWriter.java    | 116 +++++++++++++++++++
 .../notifications/SSTableAddedNotification.java |   4 +-
 .../cassandra/streaming/StreamReader.java       |  22 ++--
 .../cassandra/streaming/StreamReceiveTask.java  |  22 ++--
 .../compress/CompressedStreamReader.java        |   8 +-
 .../streaming/messages/IncomingFileMessage.java |   7 +-
 .../cassandra/tools/SSTableExpiredBlockers.java |   3 +-
 .../cassandra/tools/SSTableLevelResetter.java   |   2 +-
 .../cassandra/tools/SSTableOfflineRelevel.java  |   5 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../cassandra/tools/StandaloneUpgrader.java     |   2 +-
 .../cassandra/tools/StandaloneVerifier.java     |   7 +-
 .../db/compaction/LongCompactionsTest.java      |   6 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  12 +-
 .../db/compaction/AntiCompactionTest.java       |  10 +-
 .../compaction/CompactionAwareWriterTest.java   |   8 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   8 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  19 +--
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../io/sstable/BigTableWriterTest.java          |   4 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   2 +
 .../io/sstable/SSTableRewriterTest.java         |  10 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  25 ++--
 .../org/apache/cassandra/schema/DefsTest.java   |   6 +-
 51 files changed, 651 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a12de0a..b199c77 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -75,6 +76,33 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
+    // the directories used to load sstables on cfs instantiation
+    private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories;
+
+    /**
+     * a hook to add additional directories to initialDirectories.
+     * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup
+     */
+    public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories)
+    {
+        assert newDirectories != null;
+
+        Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories);
+
+        List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories);
+        for (Directories.DataDirectory directory: newDirectories)
+        {
+            if (!existing.contains(directory))
+            {
+                replacementList.add(directory);
+            }
+        }
+
+        Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()];
+        replacementList.toArray(replacementArray);
+        initialDirectories = replacementArray;
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
@@ -164,7 +192,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private volatile DefaultInteger maxCompactionThreshold;
     private final CompactionStrategyManager compactionStrategyManager;
 
-    public final Directories directories;
+    private volatile Directories directories;
 
     public final TableMetrics metric;
     public volatile long sampleLatencyNanos;
@@ -189,6 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
 
         compactionStrategyManager.maybeReload(metadata);
+        directories = compactionStrategyManager.getDirectories();
 
         scheduleFlush();
 
@@ -330,6 +359,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                               boolean loadSSTables,
                               boolean registerBookkeeping)
     {
+        assert directories != null;
         assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
 
         this.keyspace = keyspace;
@@ -363,6 +393,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategyManager = new CompactionStrategyManager(this);
+        this.directories = this.compactionStrategyManager.getDirectories();
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
         {
@@ -426,6 +457,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    public Directories getDirectories()
+    {
+        return directories;
+    }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+    {
+        MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+    }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+    {
+        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+    }
+
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
     public void invalidate()
     {
@@ -499,7 +546,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                          boolean loadSSTables)
     {
         // get the max generation number, to prevent generation conflicts
-        Directories directories = new Directories(metadata);
+        Directories directories = new Directories(metadata, initialDirectories);
         Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -633,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             currentDescriptors.add(sstable.descriptor);
         Set<SSTableReader> newSSTables = new HashSet<>();
 
-        Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+        Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
         {
             Descriptor descriptor = entry.getKey();
@@ -1378,9 +1425,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
     }
 
-    void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        compactionStrategyManager.replaceFlushed(memtable, sstable);
+        compactionStrategyManager.replaceFlushed(memtable, sstables);
     }
 
     public boolean isValid()
@@ -1580,7 +1627,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName)
     {
-        final File manifestFile = directories.getSnapshotManifestFile(snapshotName);
+        final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName);
 
         try
         {
@@ -1602,7 +1649,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private void createEphemeralSnapshotMarkerFile(final String snapshot)
     {
-        final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot);
+        final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);
 
         try
         {
@@ -1635,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
             active.put(sstable.descriptor.generation, sstable);
-        Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
+        Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
         Refs<SSTableReader> refs = new Refs<>();
         try
         {
@@ -1692,12 +1739,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public boolean snapshotExists(String snapshotName)
     {
-        return directories.snapshotExists(snapshotName);
+        return getDirectories().snapshotExists(snapshotName);
     }
 
     public long getSnapshotCreationTime(String snapshotName)
     {
-        return directories.snapshotCreationTime(snapshotName);
+        return getDirectories().snapshotCreationTime(snapshotName);
     }
 
     /**
@@ -1708,7 +1755,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public void clearSnapshot(String snapshotName)
     {
-        List<File> snapshotDirs = directories.getCFDirectories();
+        List<File> snapshotDirs = getDirectories().getCFDirectories();
         Directories.clearSnapshot(snapshotName, snapshotDirs);
     }
     /**
@@ -1718,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public Map<String, Pair<Long,Long>> getSnapshotDetails()
     {
-        return directories.getSnapshotDetails();
+        return getDirectories().getSnapshotDetails();
     }
 
     /**
@@ -2251,7 +2298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long trueSnapshotsSize()
     {
-        return directories.trueSnapshotsSize();
+        return getDirectories().trueSnapshotsSize();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index fa01269..90d2085 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -178,30 +178,36 @@ public class Directories
     }
 
     private final CFMetaData metadata;
+    private final DataDirectory[] paths;
     private final File[] dataPaths;
 
+    public Directories(final CFMetaData metadata)
+    {
+        this(metadata, dataDirectories);
+    }
     /**
      * Create Directories of given ColumnFamily.
      * SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time.
      *
      * @param metadata metadata of ColumnFamily
      */
-    public Directories(final CFMetaData metadata)
+    public Directories(final CFMetaData metadata, DataDirectory[] paths)
     {
         this.metadata = metadata;
+        this.paths = paths;
 
         String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
         int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
         String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
         String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null;
 
-        this.dataPaths = new File[dataDirectories.length];
+        this.dataPaths = new File[paths.length];
         // If upgraded from version less than 2.1, use existing directories
         String oldSSTableRelativePath = join(metadata.ksName, cfName);
-        for (int i = 0; i < dataDirectories.length; ++i)
+        for (int i = 0; i < paths.length; ++i)
         {
             // check if old SSTable directory exists
-            dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath);
+            dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
         }
         boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
         {
@@ -214,13 +220,13 @@ public class Directories
         {
             // use 2.1+ style
             String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId);
-            for (int i = 0; i < dataDirectories.length; ++i)
-                dataPaths[i] = new File(dataDirectories[i].location, newSSTableRelativePath);
+            for (int i = 0; i < paths.length; ++i)
+                dataPaths[i] = new File(paths[i].location, newSSTableRelativePath);
         }
         // if index, then move to its own directory
         if (indexNameWithDot != null)
         {
-            for (int i = 0; i < dataDirectories.length; ++i)
+            for (int i = 0; i < paths.length; ++i)
                 dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
         }
 
@@ -327,7 +333,7 @@ public class Directories
 
         // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
         boolean tooBig = false;
-        for (DataDirectory dataDir : dataDirectories)
+        for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
             {
@@ -393,7 +399,7 @@ public class Directories
         long writeSize = expectedTotalWriteSize / estimatedSSTables;
         long totalAvailable = 0L;
 
-        for (DataDirectory dataDir : dataDirectories)
+        for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                   continue;
@@ -481,6 +487,24 @@ public class Directories
         {
             return location.getUsableSpace();
         }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            DataDirectory that = (DataDirectory) o;
+
+            return location.equals(that.location);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return location.hashCode();
+        }
     }
 
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..c2613fe 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -210,6 +210,11 @@ public class Keyspace
         return cfs;
     }
 
+    public boolean hasColumnFamilyStore(UUID id)
+    {
+        return columnFamilyStores.containsKey(id);
+    }
+
     /**
      * Take a snapshot of the specific column family, or the entire set of column families
      * if columnFamily is null with a given timestamp

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1b30fc7..4a54666 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -345,22 +344,22 @@ public class Memtable implements Comparable<Memtable>
         {
             long writeSize = getExpectedWriteSize();
             Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-            File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+            File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
             assert sstableDirectory != null : "Flush task is not bound to any disk";
-            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstable);
+            Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
+            cfs.replaceFlushed(Memtable.this, sstables);
         }
 
         protected Directories getDirectories()
         {
-            return cfs.directories;
+            return cfs.getDirectories();
         }
 
-        private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+        private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
         {
             logger.info("Writing {}", Memtable.this.toString());
 
-            SSTableReader ssTable;
+            Collection<SSTableReader> ssTables;
             try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
             {
                 boolean trackContention = logger.isDebugEnabled();
@@ -397,20 +396,20 @@ public class Memtable implements Comparable<Memtable>
                                               context));
 
                     // sstables should contain non-repaired data.
-                    ssTable = writer.finish(true);
+                    ssTables = writer.finish(true);
                 }
                 else
                 {
                     logger.info("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
                                 writer.getFilename(), context);
                     writer.abort();
-                    ssTable = null;
+                    ssTables = null;
                 }
 
                 if (heavilyContendedRowCount > 0)
                     logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-                return ssTable;
+                return ssTables;
             }
         }
 
@@ -423,13 +422,12 @@ public class Memtable implements Comparable<Memtable>
             LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
             return new SSTableTxnWriter(txn,
-                                        SSTableWriter.create(Descriptor.fromFilename(filename),
-                                                             (long)partitions.size(),
-                                                             ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                             cfs.metadata,
-                                                             sstableMetadataCollector,
-                                                             new SerializationHeader(cfs.metadata, columns, stats),
-                                                             txn));
+                                        cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+                                                                     (long)partitions.size(),
+                                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                     sstableMetadataCollector,
+                                                                     new SerializationHeader(cfs.metadata, columns, stats),
+                                                                     txn));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index d9c9ea3..721fd70 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -25,7 +25,12 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -113,6 +119,11 @@ public abstract class AbstractCompactionStrategy
         }
     }
 
+    public Directories getDirectories()
+    {
+        return cfs.getDirectories();
+    }
+
     /**
      * For internal, temporary suspension of background compactions so that we can do exceptional
      * things like truncate or major compaction
@@ -222,12 +233,12 @@ public abstract class AbstractCompactionStrategy
      * Handle a flushed memtable.
      *
      * @param memtable the flushed memtable
-     * @param sstable the written sstable. can be null if the memtable was clean.
+     * @param sstables the written sstables. can be null or empty if the memtable was clean.
      */
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        cfs.getTracker().replaceFlushed(memtable, sstable);
-        if (sstable != null)
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
             CompactionManager.instance.submitBackground(cfs);
     }
 
@@ -493,4 +504,9 @@ public abstract class AbstractCompactionStrategy
             groupedSSTables.add(currGroup);
         return groupedSSTables;
     }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+    {
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 3bf224e..155bf2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
             transaction.close();
         }
     }
-    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
+    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
 
     protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 8aa16d5..66f9ed5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -594,7 +594,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
             // group by keyspace/columnfamily
             ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-            descriptors.put(cfs, cfs.directories.find(new File(filename.trim()).getName()));
+            descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName()));
         }
 
         List<Future<?>> futures = new ArrayList<>();
@@ -817,7 +817,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
+        File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
@@ -1192,7 +1192,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Anticompacting {}", anticompactionGroup);
         Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
 
-        File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+        File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
         int nowInSec = FBUtilities.nowInSeconds();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f5097af..47c8de8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
 
 /**
  * Manages the compaction strategies.
@@ -181,10 +187,10 @@ public class CompactionStrategyManager implements INotificationConsumer
         startup();
     }
 
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        cfs.getTracker().replaceFlushed(memtable, sstable);
-        if (sstable != null)
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
             CompactionManager.instance.submitBackground(cfs);
     }
 
@@ -235,16 +241,24 @@ public class CompactionStrategyManager implements INotificationConsumer
         return repaired.shouldDefragment();
     }
 
+    public Directories getDirectories()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.getDirectories();
+    }
 
     public synchronized void handleNotification(INotification notification, Object sender)
     {
         if (notification instanceof SSTableAddedNotification)
         {
             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
-            if (flushedNotification.added.isRepaired())
-                repaired.addSSTable(flushedNotification.added);
-            else
-                unrepaired.addSSTable(flushedNotification.added);
+            for (SSTableReader sstable : flushedNotification.added)
+            {
+                if (sstable.isRepaired())
+                    repaired.addSSTable(sstable);
+                else
+                    unrepaired.addSSTable(sstable);
+            }
         }
         else if (notification instanceof SSTableListChangedNotification)
         {
@@ -484,4 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
     }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+    {
+        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+        {
+            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+        }
+        else
+        {
+            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0bd6aae..1d96324 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -29,9 +28,9 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -43,7 +42,6 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -51,8 +49,8 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
-    private final boolean offline;
-    private final boolean keepOriginals;
+    protected final boolean offline;
+    protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
     private CompactionExecutorStatsCollector collector;
 
@@ -154,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
         {
             Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
 
-            List<SSTableReader> newSStables;
+            Collection<SSTableReader> newSStables;
 
             long[] mergedRowCounts;
 
@@ -173,7 +171,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (!controller.cfs.getCompactionStrategyManager().isActive)
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                 {
                     estimatedKeys = writer.estimatedKeys();
                     while (ci.hasNext())
@@ -228,10 +226,11 @@ public class CompactionTask extends AbstractCompactionTask
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          Directories directories,
                                                           LifecycleTransaction transaction,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals);
+        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
@@ -252,6 +251,11 @@ public class CompactionTask extends AbstractCompactionTask
         return mergeSummary.toString();
     }
 
+    protected Directories getDirectories()
+    {
+        return cfs.getDirectories();
+    }
+
     public static long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
     {
         long minRepairedAt= Long.MAX_VALUE;
@@ -264,7 +268,7 @@ public class CompactionTask extends AbstractCompactionTask
 
     protected void checkAvailableDiskSpace(long estimatedSSTables, long expectedWriteSize)
     {
-        while (!cfs.directories.hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+        while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
         {
             if (!reduceScopeForLimitedSpace())
                 throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 11d113d..eeb3615 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
@@ -42,12 +43,13 @@ public class LeveledCompactionTask extends CompactionTask
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          Directories directories,
                                                           LifecycleTransaction txn,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
-        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
+            return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
+        return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 1944364..3655a37 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -75,10 +75,11 @@ public class SSTableSplitter {
 
         @Override
         public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
                                                               LifecycleTransaction txn,
                                                               Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
+            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 747b956..c437832 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
@@ -106,7 +105,7 @@ public class Scrubber implements Closeable
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
         // Calculate the expected compacted filesize
-        this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
+        this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
         if (destination == null)
             throw new IOException("disk full");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 2353aa3..05f446c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -342,10 +343,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         @Override
         public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
                                                               LifecycleTransaction txn,
                                                               Set<SSTableReader> nonExpiredSSTables)
         {
-            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
+            return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 50e5a96..abc4107 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,14 +18,17 @@
 
 package org.apache.cassandra.db.compaction.writers;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -38,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
 {
     protected final ColumnFamilyStore cfs;
+    protected final Directories directories;
     protected final Set<SSTableReader> nonExpiredSSTables;
     protected final long estimatedTotalKeys;
     protected final long maxAge;
@@ -45,35 +49,25 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
+    private boolean isInitialized = false;
 
     public CompactionAwareWriter(ColumnFamilyStore cfs,
-                                 LifecycleTransaction txn,
-                                 Set<SSTableReader> nonExpiredSSTables)
-    {
-        this(cfs, txn, nonExpiredSSTables, false, false);
-    }
-
-    public CompactionAwareWriter(ColumnFamilyStore cfs,
+                                 Directories directories,
                                  LifecycleTransaction txn,
                                  Set<SSTableReader> nonExpiredSSTables,
                                  boolean offline,
                                  boolean keepOriginals)
     {
         this.cfs = cfs;
+        this.directories = directories;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         this.txn = txn;
         this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals);
-    }
 
-    /**
-     * Writes a partition in an implementation specific way
-     * @param partition the partition to append
-     * @return true if the partition was written, false otherwise
-     */
-    public abstract boolean append(UnfilteredRowIterator partition);
+    }
 
     @Override
     protected Throwable doAbort(Throwable accumulate)
@@ -98,7 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
      * @return all the written sstables sstables
      */
     @Override
-    public List<SSTableReader> finish()
+    public Collection<SSTableReader> finish()
     {
         super.finish();
         return sstableWriter.finished();
@@ -112,12 +106,39 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return estimatedTotalKeys;
     }
 
+    public final boolean append(UnfilteredRowIterator partition)
+    {
+        maybeSwitchWriter(partition.partitionKey());
+        return realAppend(partition);
+    }
+
+    protected abstract boolean realAppend(UnfilteredRowIterator partition);
+
+    /**
+     * Guaranteed to be called before the first call to realAppend.
+     * @param key
+     */
+    protected void maybeSwitchWriter(DecoratedKey key)
+    {
+        if (!isInitialized)
+            switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
+        isInitialized = true;
+    }
+
+    /**
+     * Implementations of this method should finish the current sstable writer and start writing to this directory.
+     *
+     * Called once before starting to append and then whenever we see a need to start writing to another directory.
+     * @param directory
+     */
+    protected abstract void switchCompactionLocation(Directories.DataDirectory directory);
+
     /**
      * The directories we can write to
      */
     public Directories getDirectories()
     {
-        return cfs.directories;
+        return directories;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index eb55d20..8b90224 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,13 +18,13 @@
 package org.apache.cassandra.db.compaction.writers;
 
 
-import java.io.File;
 import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -40,20 +40,28 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
 
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, txn, nonExpiredSSTables, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, false, false);
     }
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    {
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+    }
+
+    @Override
+    public boolean realAppend(UnfilteredRowIterator partition)
+    {
+        return sstableWriter.append(partition) != null;
+    }
+
+    @Override
+    protected void switchCompactionLocation(Directories.DataDirectory directory)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
-        logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
-        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
@@ -64,12 +72,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
-    public boolean append(UnfilteredRowIterator partition)
-    {
-        return sstableWriter.append(partition) != null;
-    }
-
-    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 73ce216..6d191f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,43 +48,32 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private int sstablesWritten = 0;
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+                                        Directories directories,
                                         LifecycleTransaction txn,
                                         Set<SSTableReader> nonExpiredSSTables,
                                         long maxSSTableSize)
     {
-        this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false);
     }
 
     @SuppressWarnings("resource")
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+                                        Directories directories,
                                         LifecycleTransaction txn,
                                         Set<SSTableReader> nonExpiredSSTables,
                                         long maxSSTableSize,
                                         boolean offline,
                                         boolean keepOriginals)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.maxSSTableSize = maxSSTableSize;
         this.allSSTables = txn.originals();
         expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
-        long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    keysPerSSTable,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    txn);
-        sstableWriter.switchWriter(writer);
     }
 
     @Override
     @SuppressWarnings("resource")
-    public boolean append(UnfilteredRowIterator partition)
+    public boolean realAppend(UnfilteredRowIterator partition)
     {
         long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
         RowIndexEntry rie = sstableWriter.append(partition);
@@ -98,19 +88,25 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
             }
 
             averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        averageEstimatedKeysPerSSTable,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
-            sstableWriter.switchWriter(writer);
+            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
             partitionsWritten = 0;
             sstablesWritten++;
         }
         return rie != null;
 
     }
+
+    public void switchCompactionLocation(Directories.DataDirectory directory)
+    {
+        File sstableDirectory = getDirectories().getLocationForDisk(directory);
+        @SuppressWarnings("resource")
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                    averageEstimatedKeysPerSSTable,
+                                                    minRepairedAt,
+                                                    cfs.metadata,
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
+        sstableWriter.switchWriter(writer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 241af0d..142fe87 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -40,16 +40,18 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
 
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+                                Directories directories,
                                 LifecycleTransaction txn,
                                 Set<SSTableReader> nonExpiredSSTables,
                                 long maxSSTableSize,
                                 int level)
     {
-        this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
     }
 
     @SuppressWarnings("resource")
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+                                Directories directories,
                                 LifecycleTransaction txn,
                                 Set<SSTableReader> nonExpiredSSTables,
                                 long maxSSTableSize,
@@ -57,7 +59,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                 boolean offline,
                                 boolean keepOriginals)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
@@ -65,37 +67,30 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+    }
+
+    @Override
+    public boolean realAppend(UnfilteredRowIterator partition)
+    {
+        RowIndexEntry rie = sstableWriter.append(partition);
+        if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+        return rie != null;
+    }
+
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                     txn);
-        sstableWriter.switchWriter(writer);
-    }
 
-    @Override
-    public boolean append(UnfilteredRowIterator partition)
-    {
-        RowIndexEntry rie = sstableWriter.append(partition);
-        if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
-        {
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        estimatedTotalKeys / estimatedSSTables,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
+        sstableWriter.switchWriter(writer);
 
-            sstableWriter.switchWriter(writer);
-        }
-        return rie != null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 65924fa..07ca3d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -51,15 +52,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
 
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
+        this(cfs, directories, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
     }
 
     @SuppressWarnings("resource")
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
     {
-        super(cfs, txn, nonExpiredSSTables, false, false);
+        super(cfs, directories, txn, nonExpiredSSTables, false, false);
         this.allSSTables = txn.originals();
         totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         double[] potentialRatios = new double[20];
@@ -81,43 +82,38 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             }
         }
         ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
         long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    currentPartitionsToWrite,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    txn);
-
-        sstableWriter.switchWriter(writer);
+        switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
         logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
     }
 
     @Override
-    public boolean append(UnfilteredRowIterator partition)
+    public boolean realAppend(UnfilteredRowIterator partition)
     {
         RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
         {
             currentRatioIndex++;
             currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-            long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
-            @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        currentPartitionsToWrite,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
-            sstableWriter.switchWriter(writer);
-            logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+            switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
         }
         return rie != null;
     }
+
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
+        long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
+        @SuppressWarnings("resource")
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+                                                    currentPartitionsToWrite,
+                                                    minRepairedAt,
+                                                    cfs.metadata,
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
+        logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+        sstableWriter.switchWriter(writer);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index c6cb979..520b229 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -338,6 +338,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         return accumulate;
     }
 
+
     /**
      * update a reader: if !original, this is a reader that is being introduced by this transaction;
      * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
@@ -355,6 +356,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
             reader.setupOnline();
     }
 
+    public void update(Collection<SSTableReader> readers, boolean original)
+    {
+        for(SSTableReader reader: readers)
+        {
+            update(reader, original);
+        }
+    }
+
     /**
      * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 6f6aca9..d028493 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -186,11 +186,8 @@ public class Tracker
     public void addSSTables(Iterable<SSTableReader> sstables)
     {
         addInitialSSTables(sstables);
-        for (SSTableReader sstable : sstables)
-        {
-            maybeIncrementallyBackup(sstable);
-            notifyAdded(sstable);
-        }
+        maybeIncrementallyBackup(sstables);
+        notifyAdded(sstables);
     }
 
     /** (Re)initializes the tracker, purging all references. */
@@ -330,10 +327,10 @@ public class Tracker
         apply(View.markFlushing(memtable));
     }
 
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
         assert !isDummy();
-        if (sstable == null)
+        if (sstables == null || sstables.isEmpty())
         {
             // sstable may be null if we flushed batchlog and nothing needed to be retained
             // if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@ -341,16 +338,16 @@ public class Tracker
             return;
         }
 
-        sstable.setupOnline();
+        sstables.forEach(SSTableReader::setupOnline);
         // back up before creating a new Snapshot (which makes the new one eligible for compaction)
-        maybeIncrementallyBackup(sstable);
+        maybeIncrementallyBackup(sstables);
 
-        apply(View.replaceFlushed(memtable, sstable));
+        apply(View.replaceFlushed(memtable, sstables));
 
         Throwable fail;
-        fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+        fail = updateSizeTracking(emptySet(), sstables, null);
         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-        fail = notifyAdded(sstable, fail);
+        fail = notifyAdded(sstables, fail);
 
         if (!isDummy() && !cfstore.isValid())
             dropSSTables();
@@ -377,13 +374,16 @@ public class Tracker
         return view.get().getUncompacting(candidates);
     }
 
-    public void maybeIncrementallyBackup(final SSTableReader sstable)
+    public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
     {
         if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
             return;
 
-        File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
-        sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+        for (SSTableReader sstable : sstables)
+        {
+            File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+            sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+        }
     }
 
     // NOTIFICATION
@@ -405,7 +405,7 @@ public class Tracker
         return accumulate;
     }
 
-    Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+    Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
     {
         INotification notification = new SSTableAddedNotification(added);
         for (INotificationConsumer subscriber : subscribers)
@@ -422,7 +422,7 @@ public class Tracker
         return accumulate;
     }
 
-    public void notifyAdded(SSTableReader added)
+    public void notifyAdded(Iterable<SSTableReader> added)
     {
         maybeFail(notifyAdded(added, null));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 7ee0fdf..b62c7e3 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
     }
 
     // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
-    static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+    static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
     {
         return new Function<View, View>()
         {
@@ -323,7 +323,7 @@ public class View
                     return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
                                     view.compactingMap, view.intervalTree);
 
-                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
                 return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f4b4da8..d94b219 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -60,11 +60,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
 
     protected SSTableTxnWriter createWriter()
     {
-        return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
-                                    0,
-                                    ActiveRepairService.UNREPAIRED_SSTABLE,
-                                    0,
-                                    new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+        return SSTableTxnWriter.create(metadata,
+                                       createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+                                       0,
+                                       ActiveRepairService.UNREPAIRED_SSTABLE,
+                                       0,
+                                       new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
new file mode 100644
index 0000000..0bb3721
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public interface SSTableMultiWriter extends Transactional
+{
+
+    /**
+     * Writes a partition in an implementation specific way
+     * @param partition the partition to append
+     * @return true if the partition was written, false otherwise
+     */
+    boolean append(UnfilteredRowIterator partition);
+
+    Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult);
+    Collection<SSTableReader> finish(boolean openResult);
+    Collection<SSTableReader> finished();
+
+    SSTableMultiWriter setOpenResult(boolean openResult);
+
+    String getFilename();
+    long getFilePointer();
+    UUID getCfId();
+
+    static void abortOrDie(SSTableMultiWriter writer)
+    {
+        Throwables.maybeFail(writer.abort(null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 42bffb1..6e1ac38 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,13 +18,17 @@
 
 package org.apache.cassandra.io.sstable;
 
-import org.apache.cassandra.db.RowIndexEntry;
+import java.util.Collection;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
@@ -35,15 +39,15 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 public class SSTableTxnWriter extends Transactional.AbstractTransactional implements Transactional
 {
     private final LifecycleTransaction txn;
-    private final SSTableWriter writer;
+    private final SSTableMultiWriter writer;
 
-    public SSTableTxnWriter(LifecycleTransaction txn, SSTableWriter writer)
+    public SSTableTxnWriter(LifecycleTransaction txn, SSTableMultiWriter writer)
     {
         this.txn = txn;
         this.writer = writer;
     }
 
-    public RowIndexEntry append(UnfilteredRowIterator iterator)
+    public boolean append(UnfilteredRowIterator iterator)
     {
         return writer.append(iterator);
     }
@@ -74,28 +78,43 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
         writer.prepareToCommit();
     }
 
-    public SSTableReader finish(boolean openResult)
+    public Collection<SSTableReader> finish(boolean openResult)
     {
         writer.setOpenResult(openResult);
         finish();
         return writer.finished();
     }
 
-    public static SSTableTxnWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        return new SSTableTxnWriter(txn, writer);
+    }
+
+    public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    {
+        if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId))
+        {
+            ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+            return create(cfs, descriptor, keyCount, repairedAt, sstableLevel, header);
+        }
+
+        // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+        MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
+        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
-    public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
         Descriptor desc = Descriptor.fromFilename(filename);
-        return create(desc, keyCount, repairedAt, sstableLevel, header);
+        return create(cfs, desc, keyCount, repairedAt, sstableLevel, header);
     }
 
-    public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header)
     {
-        return create(filename, keyCount, repairedAt, 0, header);
+        return create(cfs, filename, keyCount, repairedAt, 0, header);
     }
 }


Mime
View raw message