cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] cassandra git commit: Give compaction strategies more control over sstable creation
Date Thu, 20 Aug 2015 18:53:01 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 0d866456a -> 9ed272773


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
new file mode 100644
index 0000000..2112656
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+
+public class SimpleSSTableMultiWriter implements SSTableMultiWriter
+{
+    private final SSTableWriter writer;
+
+    private SimpleSSTableMultiWriter(SSTableWriter writer)
+    {
+        this.writer = writer;
+    }
+
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        RowIndexEntry indexEntry = writer.append(partition);
+        return indexEntry != null;
+    }
+
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+    {
+        return Collections.singleton(writer.finish(repairedAt, maxDataAge, openResult));
+    }
+
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        return Collections.singleton(writer.finish(openResult));
+    }
+
+    public Collection<SSTableReader> finished()
+    {
+        return Collections.singleton(writer.finished());
+    }
+
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        writer.setOpenResult(openResult);
+        return this;
+    }
+
+    public String getFilename()
+    {
+        return writer.getFilename();
+    }
+
+    public long getFilePointer()
+    {
+        return writer.getFilePointer();
+    }
+
+    public UUID getCfId()
+    {
+        return writer.metadata.cfId;
+    }
+
+    public Throwable commit(Throwable accumulate)
+    {
+        return writer.commit(accumulate);
+    }
+
+    public Throwable abort(Throwable accumulate)
+    {
+        return writer.abort(accumulate);
+    }
+
+    public void prepareToCommit()
+    {
+        writer.prepareToCommit();
+    }
+
+    public void close() throws Exception
+    {
+        writer.close();
+    }
+
+    public static SSTableMultiWriter create(Descriptor descriptor,
+                                            long keyCount,
+                                            long repairedAt,
+                                            CFMetaData cfm,
+                                            MetadataCollector metadataCollector,
+                                            SerializationHeader header,
+                                            LifecycleTransaction txn)
+    {
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+        return new SimpleSSTableMultiWriter(writer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index 15230ea..56d6130 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -21,8 +21,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableAddedNotification implements INotification
 {
-    public final SSTableReader added;
-    public SSTableAddedNotification(SSTableReader added)
+    public final Iterable<SSTableReader> added;
+    public SSTableAddedNotification(Iterable<SSTableReader> added)
     {
         this.added = added;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index a098786..d4b7283 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -36,12 +36,11 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
@@ -85,7 +84,7 @@ public class StreamReader
      * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
      */
     @SuppressWarnings("resource")
-    public SSTableWriter read(ReadableByteChannel channel) throws IOException
+    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
     {
         logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
@@ -98,7 +97,7 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+        SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
@@ -115,7 +114,8 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            SSTableMultiWriter.abortOrDie(writer);
+
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
@@ -124,14 +124,16 @@ public class StreamReader
         }
     }
 
-    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
+    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
     {
-        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
+        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
-        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
 
-        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+
+
+        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -161,7 +163,7 @@ public class StreamReader
         return size;
     }
 
-    protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer, ColumnFamilyStore cfs) throws IOException
+    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException
     {
         DecoratedKey key = deserializer.newPartition();
         writer.append(deserializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 7311069..2bcbbc1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -68,7 +69,7 @@ public class StreamReceiveTask extends StreamTask
     private boolean done = false;
 
     //  holds references to SSTables received
-    protected Collection<SSTableWriter> sstables;
+    protected Collection<SSTableMultiWriter> sstables;
 
     public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
     {
@@ -86,12 +87,12 @@ public class StreamReceiveTask extends StreamTask
      *
      * @param sstable SSTable file received.
      */
-    public synchronized void received(SSTableWriter sstable)
+    public synchronized void received(SSTableMultiWriter sstable)
     {
         if (done)
             return;
 
-        assert cfId.equals(sstable.metadata.cfId);
+        assert cfId.equals(sstable.getCfId());
 
         sstables.add(sstable);
         if (sstables.size() == totalFiles)
@@ -126,8 +127,8 @@ public class StreamReceiveTask extends StreamTask
             if (kscf == null)
             {
                 // schema was dropped during streaming
-                for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+
                 task.sstables.clear();
                 task.txn.abort();
                 return;
@@ -138,11 +139,11 @@ public class StreamReceiveTask extends StreamTask
             try
             {
                 List<SSTableReader> readers = new ArrayList<>();
-                for (SSTableWriter writer : task.sstables)
+                for (SSTableMultiWriter writer : task.sstables)
                 {
-                    SSTableReader reader = writer.finish(true);
-                    readers.add(reader);
-                    task.txn.update(reader, false);
+                    Collection<SSTableReader> newReaders = writer.finish(true);
+                    readers.addAll(newReaders);
+                    task.txn.update(newReaders, false);
                 }
 
                 task.sstables.clear();
@@ -211,8 +212,7 @@ public class StreamReceiveTask extends StreamTask
             return;
 
         done = true;
-        for (SSTableWriter writer : sstables)
-            writer.abort();
+        sstables.forEach(SSTableMultiWriter::abortOrDie);
         txn.abort();
         sstables.clear();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 205291b..f702e24 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +62,7 @@ public class CompressedStreamReader extends StreamReader
      */
     @Override
     @SuppressWarnings("resource")
-    public SSTableWriter read(ReadableByteChannel channel) throws IOException
+    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
     {
         logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
@@ -75,7 +75,7 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+        SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
@@ -102,7 +102,7 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            SSTableMultiWriter.abortOrDie(writer);
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index bce9691..19f9e12 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -17,12 +17,11 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -80,9 +79,9 @@ public class IncomingFileMessage extends StreamMessage
     };
 
     public FileMessageHeader header;
-    public SSTableWriter sstable;
+    public SSTableMultiWriter sstable;
 
-    public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header)
+    public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header)
     {
         super(Type.FILE);
         this.header = header;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
index c8587d8..701bbc3 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -69,7 +68,7 @@ public class SSTableExpiredBlockers
 
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
-        Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+        Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
         Set<SSTableReader> sstables = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index cb3cc5c..4f4e904 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -76,7 +76,7 @@ public class SSTableLevelResetter
             Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName);
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily);
             boolean foundSSTable = false;
-            for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
+            for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
             {
                 if (sstable.getValue().contains(Component.STATS))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 95f516a..6554bd0 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -30,17 +30,14 @@ import java.util.Set;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Create a decent leveling for the given keyspace/column family
@@ -95,7 +92,7 @@ public class SSTableOfflineRelevel
 
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
-        Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+        Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
         Set<SSTableReader> sstables = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index f64b8d9..f3a1a35 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -84,7 +84,7 @@ public class StandaloneScrubber
             String snapshotName = "pre-scrub-" + System.currentTimeMillis();
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
 
             List<SSTableReader> sstables = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 88e34b7..cf94c99 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -63,7 +63,7 @@ public class StandaloneUpgrader
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf);
 
             OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW);
             if (options.snapshot != null)
                 lister.onlyBackups(true).snapshots(options.snapshot);
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index 0b17e39..3412329 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -18,10 +18,6 @@
  */
 package org.apache.cassandra.tools;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -35,7 +31,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.commons.cli.*;
 
-import java.io.File;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -69,7 +64,7 @@ public class StandaloneVerifier
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName);
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
 
             boolean extended = options.extended;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 7db978e..d684e11 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -114,9 +114,9 @@ public class LongCompactionsTest
                     builder.newRow(String.valueOf(i)).add("val", String.valueOf(i));
                 rows.put(key, builder.build());
             }
-            SSTableReader sstable = SSTableUtils.prepare().write(rows);
-            sstables.add(sstable);
-            store.addSSTable(sstable);
+            Collection<SSTableReader> readers = SSTableUtils.prepare().write(rows);
+            sstables.addAll(readers);
+            store.addSSTables(readers);
         }
 
         // give garbage collection a bit of time to catch up

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 249dd8d..6b50e49 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -86,7 +86,7 @@ public class MockSchema
 
     public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
     {
-        Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(),
+        Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
                                                cfs.getColumnFamilyName(),
                                                generation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 78f9e3e..6840e2b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -329,7 +329,7 @@ public class ColumnFamilyStoreTest
         assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
 
-        ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories);
+        ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories());
 
         snapshotDetails = cfs.getSnapshotDetails();
         assertEquals(1, snapshotDetails.size());
@@ -350,7 +350,7 @@ public class ColumnFamilyStoreTest
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
+            Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
             Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 8889488..52b2aa8 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -324,12 +324,12 @@ public class ScrubTest
             String filename = cfs.getSSTablePath(tempDataDir);
             Descriptor desc = Descriptor.fromFilename(filename);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc,
-                                                             keys.size(),
-                                                             0L,
-                                                             0,
-                                                             SerializationHeader.make(cfs.metadata,
-                                                                                      Collections.emptyList())))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc,
+                                                                   keys.size(),
+                                                                   0L,
+                                                                   0,
+                                                                   SerializationHeader.make(cfs.metadata,
+                                                                                            Collections.emptyList())))
             {
 
                 for (String k : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6dc5f53..a3167f9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.UpdateBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -162,10 +163,10 @@ public class AntiCompactionTest
 
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
-        File dir = cfs.directories.getDirectoryForNewSSTables();
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         String filename = cfs.getSSTablePath(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {
@@ -175,7 +176,10 @@ public class AntiCompactionTest
                 writer.append(builder.build().unfilteredIterator());
 
             }
-            return writer.finish(true);
+            Collection<SSTableReader> sstables = writer.finish(true);
+            assertNotNull(sstables);
+            assertEquals(1, sstables.size());
+            return sstables.iterator().next();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 1b94a6b..68936f5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -75,7 +75,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals());
+        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals());
         int rows = compact(cfs, txn, writer);
         assertEquals(1, cfs.getLiveSSTables().size());
         assertEquals(rowCount, rows);
@@ -94,7 +94,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/10;
-        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0);
+        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize, 0);
         int rows = compact(cfs, txn, writer);
         assertEquals(10, cfs.getLiveSSTables().size());
         assertEquals(rowCount, rows);
@@ -111,7 +111,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0);
+        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), 0);
         int rows = compact(cfs, txn, writer);
         long expectedSize = beforeSize / 2;
         List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getLiveSSTables());
@@ -147,7 +147,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/targetSSTableCount;
-        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize);
+        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize);
         int rows = compact(cfs, txn, writer);
         assertEquals(targetSSTableCount, cfs.getLiveSSTables().size());
         int [] levelCounts = new int[5];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9d5e5fc..8035ac4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -346,7 +346,7 @@ public class LeveledCompactionStrategyTest
         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
-        strategy.handleNotification(new SSTableAddedNotification(sstable2), this);
+        strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 309e35a..9eff1b1 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -34,7 +34,6 @@ import junit.framework.Assert;
 import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -50,7 +49,6 @@ import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -134,11 +132,11 @@ public class RealTransactionsTest extends SchemaLoader
     {
         cfs.truncateBlocking();
 
-        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
-        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+        String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+        String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
 
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                       .inDirectory(cfs.directories.getDirectoryForNewSSTables())
+                                                       .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
                                                        .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
                                                        .using(String.format(query, cfs.keyspace.getName(), cfs.name))
                                                        .build())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 3a943c4..9bcd9e7 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -30,6 +30,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -177,7 +178,7 @@ public class TrackerTest
             Assert.assertTrue(reader.isKeyCacheSetup());
 
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
-        Assert.assertEquals(3, listener.senders.size());
+        Assert.assertEquals(1, listener.senders.size());
         Assert.assertEquals(tracker, listener.senders.get(0));
         Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification);
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -296,10 +297,10 @@ public class TrackerTest
         Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
 
         SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
-        tracker.replaceFlushed(prev2, reader);
+        tracker.replaceFlushed(prev2, Collections.singleton(reader));
         Assert.assertEquals(1, tracker.getView().sstables.size());
         Assert.assertEquals(1, listener.received.size());
-        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         Assert.assertTrue(reader.isKeyCacheSetup());
         Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@ -313,12 +314,12 @@ public class TrackerTest
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);
-        tracker.replaceFlushed(prev1, reader);
+        tracker.replaceFlushed(prev1, Collections.singleton(reader));
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(3, listener.received.size());
-        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
         Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -332,8 +333,8 @@ public class TrackerTest
         Tracker tracker = new Tracker(null, false);
         MockListener listener = new MockListener(false);
         tracker.subscribe(listener);
-        tracker.notifyAdded(r1);
-        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        tracker.notifyAdded(singleton(r1));
+        Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         tracker.notifyDeleting(r1);
         Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting);
@@ -353,8 +354,8 @@ public class TrackerTest
         MockListener failListener = new MockListener(true);
         tracker.subscribe(failListener);
         tracker.subscribe(listener);
-        Assert.assertNotNull(tracker.notifyAdded(r1, null));
-        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null));
+        Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
         Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 40afa54..523c203 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -195,7 +195,7 @@ public class ViewTest
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());
 
         SSTableReader sstable = MockSchema.sstable(1, cfs);
-        cur = View.replaceFlushed(memtable1, sstable).apply(cur);
+        cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
         Assert.assertEquals(0, cur.flushingMemtables.size());
         Assert.assertEquals(1, cur.liveMemtables.size());
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 856ef7c..e1ab48f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -64,12 +64,12 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW()
         {
-            this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+            this(cfs.getSSTablePath(cfs.getDirectories().getDirectoryForNewSSTables()));
         }
 
         private TestableBTW(String file)
         {
-            this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(String file, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index ceeb369..a9165f7 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -44,6 +45,7 @@ public class CQLSSTableWriterClientTest
     public void setUp()
     {
         this.testDirectory = Files.createTempDir();
+        Keyspace.setInitialized();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index d9516cb..1c61f51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
         truncate(cfs);
 
-        File dir = cfs.directories.getDirectoryForNewSSTables();
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata);
         try (SSTableWriter writer = getWriter(cfs, dir, txn))
         {
@@ -941,10 +941,10 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> result = new LinkedHashSet<>();
         for (int f = 0 ; f < fileCount ; f++)
         {
-            File dir = cfs.directories.getDirectoryForNewSSTables();
+            File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             String filename = cfs.getSSTablePath(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
                 for ( ; i < end ; i++)
@@ -955,7 +955,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
                     writer.append(builder.build().unfilteredIterator());
                 }
-                result.add(writer.finish(true));
+                result.addAll(writer.finish(true));
             }
         }
         return result;
@@ -972,7 +972,7 @@ public class SSTableRewriterTest extends SchemaLoader
             liveDescriptors.add(sstable.descriptor.generation);
             spaceUsed += sstable.bytesOnDisk();
         }
-        for (File dir : cfs.directories.getCFDirectories())
+        for (File dir : cfs.getDirectories().getCFDirectories())
         {
             for (File f : dir.listFiles())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 6de5bb9..89c0d61 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -164,7 +165,7 @@ public class SSTableUtils
             return this;
         }
 
-        public SSTableReader write(Set<String> keys) throws IOException
+        public Collection<SSTableReader> write(Set<String> keys) throws IOException
         {
             Map<String, PartitionUpdate> map = new HashMap<>();
             for (String key : keys)
@@ -176,7 +177,7 @@ public class SSTableUtils
             return write(map);
         }
 
-        public SSTableReader write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
+        public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
         {
             final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
             return write(sorted.size(), new Appender()
@@ -192,7 +193,7 @@ public class SSTableUtils
             });
         }
 
-        public SSTableReader write(Map<String, PartitionUpdate> entries) throws IOException
+        public Collection<SSTableReader> write(Map<String, PartitionUpdate> entries) throws IOException
         {
             SortedMap<DecoratedKey, PartitionUpdate> sorted = new TreeMap<>();
             for (Map.Entry<String, PartitionUpdate> entry : entries.entrySet())
@@ -201,18 +202,22 @@ public class SSTableUtils
             return write(sorted);
         }
 
-        public SSTableReader write(int expectedSize, Appender appender) throws IOException
+        public Collection<SSTableReader> write(int expectedSize, Appender appender) throws IOException
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
-            SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname, cfname), Collections.EMPTY_LIST);
-            SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
+            CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
+            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
+            SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
             while (appender.append(writer)) { /* pass */ }
-            SSTableReader reader = writer.finish(true);
+            Collection<SSTableReader> readers = writer.finish(true);
+
             // mark all components for removal
             if (cleanup)
-                for (Component component : reader.components)
-                    new File(reader.descriptor.filenameFor(component)).deleteOnExit();
-            return reader;
+                for (SSTableReader reader: readers)
+                    for (Component component : reader.components)
+                        new File(reader.descriptor.filenameFor(component)).deleteOnExit();
+            return readers;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index c10865a..c3bcf1a 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -204,7 +204,7 @@ public class DefsTest
         ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assertNotNull(store);
         store.forceBlockingFlush();
-        assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
+        assertTrue(store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
 
         MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
 
@@ -226,7 +226,7 @@ public class DefsTest
 
         // verify that the files are gone.
         Supplier<Object> lambda = () -> {
-            for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles())
+            for (File file : store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).listFiles())
             {
                 if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists())
                     return false;
@@ -275,7 +275,7 @@ public class DefsTest
         ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assertNotNull(cfs);
         cfs.forceBlockingFlush();
-        assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
+        assertTrue(!cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
 
         MigrationManager.announceKeyspaceDrop(ks.name);
 


Mime
View raw message