cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject cassandra git commit: Put CQLSSTableWriter back to the old interface/behavior before CASSANDRA-11844
Date Mon, 12 Sep 2016 15:49:20 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 3f49c328f -> 0026e4eee


Put CQLSSTableWriter back to the old interface/behavior before CASSANDRA-11844

Patch by Jeremiah Jordan; reviewed by Jake Luciani for CASSANDRA-12551


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0026e4ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0026e4ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0026e4ee

Branch: refs/heads/trunk
Commit: 0026e4eeec23367c74c44b23a9586562b939f6f8
Parents: 3f49c32
Author: Jeremiah D Jordan <jeremiah@datastax.com>
Authored: Fri Sep 2 10:06:39 2016 -0500
Committer: T Jake Luciani <jake@apache.org>
Committed: Mon Sep 12 11:48:45 2016 -0400

----------------------------------------------------------------------
 .../hadoop/cql3/CqlBulkRecordWriter.java        |   4 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  29 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 148 ++--
 .../cassandra/io/sstable/SSTableLoader.java     |  16 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |  12 +-
 .../io/sstable/SSTableSimpleWriter.java         |   8 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  10 +-
 .../cassandra/streaming/LongStreamingTest.java  |   7 +-
 .../db/lifecycle/RealTransactionsTest.java      |   4 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |  16 +-
 .../io/sstable/CQLSSTableWriterTest.java        |  37 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |  38 +-
 .../io/sstable/StressCQLSSTableWriter.java      | 672 +++++++++++++++++++
 .../cassandra/stress/CompactionStress.java      |  20 +-
 .../apache/cassandra/stress/StressProfile.java  |   2 +-
 .../operations/userdefined/SchemaInsert.java    |  20 +-
 16 files changed, 813 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index bd157e9..2ed37ee 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -75,7 +75,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
     protected final Configuration conf;
     protected final int maxFailures;
     protected final int bufferSize;
-    protected CQLSSTableWriter writer;
+    protected Closeable writer;
     protected SSTableLoader loader;
     protected Progressable progress;
     protected TaskAttemptContext context;
@@ -174,7 +174,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
             ExternalClient externalClient = new ExternalClient(conf);
             externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
 
-            loader = new SSTableLoader(writer.getInnermostDirectory(), externalClient, new NullOutputHandler())
+            loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
             {
                 @Override
                 public void onSuccess(StreamState finalState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f989878..9a8f968 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,13 +22,15 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -38,17 +40,17 @@ import org.apache.cassandra.utils.Pair;
  */
 abstract class AbstractSSTableSimpleWriter implements Closeable
 {
-    protected final ColumnFamilyStore cfs;
-    protected final IPartitioner partitioner;
+    protected final File directory;
+    protected final CFMetaData metadata;
     protected final PartitionColumns columns;
     protected SSTableFormat.Type formatType = SSTableFormat.Type.current();
     protected static AtomicInteger generation = new AtomicInteger(0);
     protected boolean makeRangeAware = false;
 
-    protected AbstractSSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner,  PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
     {
-        this.cfs = cfs;
-        this.partitioner = partitioner;
+        this.metadata = metadata;
+        this.directory = directory;
         this.columns = columns;
     }
 
@@ -65,17 +67,18 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
 
     protected SSTableTxnWriter createWriter()
     {
-        SerializationHeader header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
 
         if (makeRangeAware)
-            return SSTableTxnWriter.createRangeAware(cfs, 0,  ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+            return SSTableTxnWriter.createRangeAware(metadata, 0,  ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
 
-        return SSTableTxnWriter.create(cfs,
-                                       createDescriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata.ksName, cfs.metadata.cfName, formatType),
+        return SSTableTxnWriter.create(metadata,
+                                       createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       header);
+                                       header,
+                                       Collections.emptySet());
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
@@ -115,7 +118,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
 
     PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
     {
-        return getUpdateFor(partitioner.decorateKey(key));
+        return getUpdateFor(metadata.decorateKey(key));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index e668b75..8a9d01d 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -21,14 +21,17 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
 import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
-import com.sun.org.apache.xpath.internal.operations.Bool;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -40,7 +43,8 @@ import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -49,12 +53,10 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -331,30 +333,14 @@ public class CQLSSTableWriter implements Closeable
         return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
     }
     /**
-     * The writer loads data in directories corresponding to how they laid out on the server.
-     * <p>
-     * {keyspace}/{table-cfid}/
-     *
-     * This method can be used to fetch the innermost directory with the sstable components
-     * @return The directory containing the sstable components
-     */
-    public File getInnermostDirectory()
-    {
-        return writer.cfs.getDirectories().getDirectoryForNewSSTables();
-    }
-
-    /**
      * A Builder for a CQLSSTableWriter object.
      */
     public static class Builder
     {
-        private final List<File> directoryList;
-        private ColumnFamilyStore cfs;
+        private File directory;
 
         protected SSTableFormat.Type formatType = null;
 
-        private Boolean makeRangeAware = false;
-
         private CreateTableStatement.RawStatement schemaStatement;
         private final List<CreateTypeStatement> typeStatements;
         private UpdateStatement.ParsedInsert insertStatement;
@@ -363,10 +349,8 @@ public class CQLSSTableWriter implements Closeable
         private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
-        protected Builder()
-        {
+        protected Builder() {
             this.typeStatements = new ArrayList<>();
-            this.directoryList = new ArrayList<>();
         }
 
         /**
@@ -389,7 +373,7 @@ public class CQLSSTableWriter implements Closeable
          * <p>
          * This is a mandatory option.
          *
-         * @param directory the directory to use, which should exist and be writable.
+         * @param directory the directory to use, which should exists and be writable.
          * @return this builder.
          *
          * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
@@ -401,29 +385,10 @@ public class CQLSSTableWriter implements Closeable
             if (!directory.canWrite())
                 throw new IllegalArgumentException(directory + " exists but is not writable");
 
-            directoryList.add(directory);
+            this.directory = directory;
             return this;
         }
 
-        /**
-         * A pre-instanciated ColumnFamilyStore
-         * <p>
-         * This is can be used in place of inDirectory and forTable
-         *
-         * @see #inDirectory(File)
-         *
-         * @param cfs the list of directories to use, which should exist and be writable.
-         * @return this builder.
-         *
-         * @throws IllegalArgumentException if a directory doesn't exist or is not writable.
-         */
-        public Builder withCfs(ColumnFamilyStore cfs)
-        {
-            this.cfs = cfs;
-            return this;
-        }
-
-
         public Builder withType(String typeDefinition) throws SyntaxException
         {
             typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE"));
@@ -466,20 +431,6 @@ public class CQLSSTableWriter implements Closeable
             return this;
         }
 
-
-        /**
-         * Specify if the sstable writer should be vnode range aware.
-         * This will create a sstable per vnode range.
-         *
-         * @param makeRangeAware
-         * @return
-         */
-        public Builder rangeAware(boolean makeRangeAware)
-        {
-            this.makeRangeAware = makeRangeAware;
-            return this;
-        }
-
         /**
          * The INSERT statement defining the order of the values to add for a given CQL row.
          * <p>
@@ -548,36 +499,36 @@ public class CQLSSTableWriter implements Closeable
         @SuppressWarnings("resource")
         public CQLSSTableWriter build()
         {
-            if (directoryList.isEmpty() && cfs == null)
-                throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()");
-            if (schemaStatement == null && cfs == null)
+            if (directory == null)
+                throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()");
+            if (schemaStatement == null)
                 throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
             if (insertStatement == null)
                 throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
 
             synchronized (CQLSSTableWriter.class)
             {
-                if (cfs == null)
-                    cfs = createOfflineTable(schemaStatement, typeStatements, directoryList);
+                String keyspace = schemaStatement.keyspace();
 
-                if (partitioner == null)
-                    partitioner = cfs.getPartitioner();
+                if (Schema.instance.getKSMetaData(keyspace) == null)
+                    Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
 
+                createTypes(keyspace);
+                CFMetaData cfMetaData = createTable(keyspace);
                 Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert();
+
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new SSTableSimpleWriter(cfs, partitioner, preparedInsert.left.updatedColumns())
-                                                     : new SSTableSimpleUnsortedWriter(cfs, partitioner, preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                     ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
+                                                     : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
 
-                writer.setRangeAwareWriting(makeRangeAware);
-
                 return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right);
             }
         }
 
-        private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements)
+        private void createTypes(String keyspace)
         {
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
@@ -587,50 +538,31 @@ public class CQLSSTableWriter implements Closeable
             ksm = ksm.withSwapped(builder.build());
             Schema.instance.setKeyspaceMetadata(ksm);
         }
-
-        public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList)
-        {
-            return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList);
-        }
-
         /**
          * Creates the table according to schema statement
-         * with specified data directories
+         *
+         * @param keyspace name of the keyspace where table should be created
          */
-        public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList)
+        private CFMetaData createTable(String keyspace)
         {
-            String keyspace = schemaStatement.keyspace();
-
-            if (Schema.instance.getKSMetaData(keyspace) == null)
-                Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
-
-            createTypes(keyspace, typeStatements);
-
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
 
             CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
-            assert cfMetaData == null;
-
-            CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
-            statement.validate(ClientState.forInternalCalls());
-
-            //Build metatdata with a portable cfId
-            cfMetaData = statement.metadataBuilder()
-                                  .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
-                                  .build()
-                                  .params(statement.params());
-
-            Keyspace.setInitialized();
-            Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+            if (cfMetaData == null)
+            {
+                CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
+                statement.validate(ClientState.forInternalCalls());
 
-            Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
-            ColumnFamilyStore cfs =  ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true);
+                cfMetaData = statement.getCFMetaData();
 
-            ks.initCfCustom(cfs);
-            Schema.instance.load(cfs.metadata);
-            Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
+                Schema.instance.load(cfMetaData);
+                Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
+            }
 
-            return cfs;
+            if (partitioner != null)
+                return cfMetaData.copy(partitioner);
+            else
+                return cfMetaData;
         }
 
         /**
@@ -655,7 +587,7 @@ public class CQLSSTableWriter implements Closeable
         }
     }
 
-    public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
+    private static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 15dd925..043f6fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,25 +56,13 @@ public class SSTableLoader implements StreamEventHandler
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
-        this(directory, client, outputHandler, directory.getParentFile().getName(), 1);
+        this(directory, client, outputHandler, 1);
     }
 
-
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
     {
-        this(directory, client, outputHandler, directory.getParentFile().getName(), connectionsPerHost);
-    }
-
-    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace)
-    {
-        this(directory, client, outputHandler, keyspace, 1);
-    }
-
-
-    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace, int connectionsPerHost)
-    {
         this.directory = directory;
-        this.keyspace = keyspace;
+        this.keyspace = directory.getParentFile().getName();
         this.client = client;
         this.outputHandler = outputHandler;
         this.connectionsPerHost = connectionsPerHost;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 2563f26..fa88817 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -35,7 +34,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -62,11 +60,11 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
     {
-        super(cfs, partitioner, columns);
+        super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -112,7 +110,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
     private PartitionUpdate createPartitionUpdate(DecoratedKey key)
     {
-        return new PartitionUpdate(cfs.metadata, key, columns, 4)
+        return new PartitionUpdate(metadata, key, columns, 4)
         {
             @Override
             public void add(Row row)
@@ -206,7 +204,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                     if (b == SENTINEL)
                         return;
 
-                    try (SSTableTxnWriter writer = createWriter())
+                        try (SSTableTxnWriter writer = createWriter())
                     {
                         for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
                             writer.append(entry.getValue().unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 2f6dd33..7fbd79d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,14 +19,12 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -45,9 +43,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns)
+    protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
     {
-        super(cfs, partitioner, columns);
+        super(directory, metadata, columns);
     }
 
     private SSTableTxnWriter getOrCreateWriter()
@@ -69,7 +67,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
             if (update != null)
                 writePartition(update);
             currentKey = key;
-            update = new PartitionUpdate(cfs.metadata, currentKey, columns, 4);
+            update = new PartitionUpdate(metadata, currentKey, columns, 4);
         }
 
         assert update != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 28ca4c4..015c5bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -106,8 +107,15 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
 
 
     @SuppressWarnings("resource") // log and writer closed during doPostCleanup
-    public static SSTableTxnWriter createRangeAware(ColumnFamilyStore cfs, long keyCount, long repairedAt, SSTableFormat.Type type, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter createRangeAware(CFMetaData cfm,
+                                                    long keyCount,
+                                                    long repairedAt,
+                                                    SSTableFormat.Type type,
+                                                    int sstableLevel,
+                                                    SerializationHeader header)
     {
+
+        ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
         SSTableMultiWriter writer;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 51b049d..1340224 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -92,7 +91,7 @@ public class LongStreamingTest
         writer.close();
         System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
 
-        File[] dataFiles = writer.getInnermostDirectory().listFiles((dir, name) -> name.endsWith("-Data.db"));
+        File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db"));
         long dataSize = 0l;
         for (File file : dataFiles)
         {
@@ -100,7 +99,7 @@ public class LongStreamingTest
             dataSize += file.length();
         }
 
-        SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
+        SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)
@@ -127,7 +126,7 @@ public class LongStreamingTest
 
 
         //Stream again
-        loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
+        loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 515ce18..595610e 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -131,10 +131,12 @@ public class RealTransactionsTest extends SchemaLoader
     {
         cfs.truncateBlocking();
 
+        String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
 
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                       .withCfs(cfs)
+                                                       .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
+                                                       .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
                                                        .using(String.format(query, cfs.keyspace.getName(), cfs.name))
                                                        .build())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 9502dfa..8025861 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -22,17 +22,15 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 
 import com.google.common.io.Files;
-import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.tools.Util;
 
 import static org.junit.Assert.assertEquals;
 
@@ -77,10 +75,16 @@ public class CQLSSTableWriterClientTest
         writer.close();
         writer2.close();
 
-        FilenameFilter filter = (dir, name) -> name.endsWith("-Data.db");
+        FilenameFilter filter = new FilenameFilter()
+        {
+            @Override
+            public boolean accept(File dir, String name)
+            {
+                return name.endsWith("-Data.db");
+            }
+        };
 
-        File[] dataFiles = (File[])ArrayUtils.addAll(writer2.getInnermostDirectory().listFiles(filter),
-                                                     writer.getInnermostDirectory().listFiles(filter));
+        File[] dataFiles = this.testDirectory.listFiles(filter);
         assertEquals(2, dataFiles.length);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 56e62ee..3c80b9e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -33,11 +33,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.auth.AuthConfig;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.UDHelper;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
@@ -97,7 +95,7 @@ public class CQLSSTableWriterTest
 
             writer.close();
 
-            loadSSTables(writer.getInnermostDirectory(), KS);
+            loadSSTables(dataDir, KS);
 
             UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
             assertEquals(4, rs.size());
@@ -187,7 +185,7 @@ public class CQLSSTableWriterTest
                 return name.endsWith("-Data.db");
             }
         };
-        assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 : Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles));
+        assert dataDir.list(filterDataFiles).length > 1 : Arrays.toString(dataDir.list(filterDataFiles));
     }
 
 
@@ -221,22 +219,28 @@ public class CQLSSTableWriterTest
     private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
     private class WriterThread extends Thread
     {
+        private final File dataDir;
         private final int id;
-        private final ColumnFamilyStore cfs;
         public volatile Exception exception;
 
-        public WriterThread(ColumnFamilyStore cfs, int id)
+        public WriterThread(File dataDir, int id)
         {
-            this.cfs = cfs;
+            this.dataDir = dataDir;
             this.id = id;
         }
 
         @Override
         public void run()
         {
+            String schema = "CREATE TABLE cql_keyspace2.table2 ("
+                    + "  k int,"
+                    + "  v int,"
+                    + "  PRIMARY KEY (k, v)"
+                    + ")";
             String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
             CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                    .withCfs(cfs)
+                    .inDirectory(dataDir)
+                    .forTable(schema)
                     .using(insert).build();
 
             try
@@ -264,17 +268,10 @@ public class CQLSSTableWriterTest
         File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
         assert dataDir.mkdirs();
 
-        String schema = "CREATE TABLE cql_keyspace2.table2 ("
-                        + "  k int,"
-                        + "  v int,"
-                        + "  PRIMARY KEY (k, v)"
-                        + ")";
-        ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(schema, Collections.singletonList(dataDir));
-
         WriterThread[] threads = new WriterThread[5];
         for (int i = 0; i < threads.length; i++)
         {
-            WriterThread thread = new WriterThread(cfs, i);
+            WriterThread thread = new WriterThread(dataDir, i);
             threads[i] = thread;
             thread.start();
         }
@@ -289,7 +286,7 @@ public class CQLSSTableWriterTest
             }
         }
 
-        loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS);
+        loadSSTables(dataDir, KS);
 
         UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
         assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
@@ -341,7 +338,7 @@ public class CQLSSTableWriterTest
         }
 
         writer.close();
-        loadSSTables(writer.getInnermostDirectory(), KS);
+        loadSSTables(dataDir, KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
         TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
@@ -412,7 +409,7 @@ public class CQLSSTableWriterTest
         }
 
         writer.close();
-        loadSSTables(writer.getInnermostDirectory(), KS);
+        loadSSTables(dataDir, KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
 
@@ -503,7 +500,7 @@ public class CQLSSTableWriterTest
         writer.addRow(5, 5, 5, "5");
 
         writer.close();
-        loadSSTables(writer.getInnermostDirectory(), KS);
+        loadSSTables(dataDir, KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
         Iterator<UntypedResultSet.Row> iter = resultSet.iterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index ba7571f..72c7467 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -34,8 +33,6 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -125,8 +122,6 @@ public class SSTableLoaderTest
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
 
-
-        File outputDir;
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                        .inDirectory(dataDir)
                                                        .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
@@ -134,22 +129,22 @@ public class SSTableLoaderTest
                                                        .build())
         {
             writer.addRow("key1", "col1", "100");
-            outputDir = writer.getInnermostDirectory();
         }
 
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
         final CountDownLatch latch = new CountDownLatch(1);
-        SSTableLoader loader = new SSTableLoader(outputDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
+        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
-        UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s;", KEYSPACE1, CF_STANDARD1));
-
-        assertEquals(1, rs.size());
-
-        Iterator<UntypedResultSet.Row> iter = rs.iterator();
-        UntypedResultSet.Row row;
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
 
-        row = iter.next();
-        assertEquals("key1", row.getString("key"));
+        assertEquals(1, partitions.size());
+        assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
+                                                                   .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
+                                                                   .value());
 
         // The stream future is signalled when the work is complete but before releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).
@@ -165,9 +160,8 @@ public class SSTableLoaderTest
         //make sure we have no tables...
         assertTrue(dataDir.listFiles().length == 0);
 
-        //Since this is running in the same jvm we need to put it in a tmp keyspace
-        String schema = "CREATE TABLE \"%stmp\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name)) with compression = {}";
-        String query = "INSERT INTO \"%stmp\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
+        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
 
         CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                   .inDirectory(dataDir)
@@ -176,7 +170,7 @@ public class SSTableLoaderTest
                                                   .withBufferSizeInMB(1)
                                                   .build();
 
-        int NB_PARTITIONS = 4200; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
+        int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
 
         for (int i = 0; i < NB_PARTITIONS; i++)
         {
@@ -188,11 +182,11 @@ public class SSTableLoaderTest
         cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
 
         //make sure we have some tables...
-        assertTrue(writer.getInnermostDirectory().listFiles().length > 0);
+        assertTrue(dataDir.listFiles().length > 0);
 
         final CountDownLatch latch = new CountDownLatch(2);
         //writer is still open so loader should not load anything
-        SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
+        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
@@ -202,7 +196,7 @@ public class SSTableLoaderTest
         // now we complete the write and the second loader should load the last sstable as well
         writer.close();
 
-        loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
+        loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
new file mode 100644
index 0000000..4fe05a8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.functions.UDHelper;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.CreateTypeStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.UpdateStatement;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Utility to write SSTables.
+ * <p>
+ * Typical usage looks like:
+ * <pre>
+ *   String type = CREATE TYPE myKs.myType (a int, b int)";
+ *   String schema = "CREATE TABLE myKs.myTable ("
+ *                 + "  k int PRIMARY KEY,"
+ *                 + "  v1 text,"
+ *                 + "  v2 int,"
+ *                 + "  v3 myType,"
+ *                 + ")";
+ *   String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, ?, ?)";
+ *
+ *   // Creates a new writer. You need to provide at least the directory where to write the created sstable,
+ *   // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the
+ *   // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see
+ *   // StressCQLSSTableWriter.Builder for more details on the available options.
+ *   StressCQLSSTableWriter writer = StressCQLSSTableWriter.builder()
+ *                                             .inDirectory("path/to/directory")
+ *                                             .withType(type)
+ *                                             .forTable(schema)
+ *                                             .using(insert).build();
+ *
+ *   UserType myType = writer.getUDType("myType");
+ *   // Adds a nember of rows to the resulting sstable
+ *   writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 10).setInt("b", 20));
+ *   writer.addRow(1, "test2", null, null);
+ *   writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 30).setInt("b", 40));
+ *
+ *   // Close the writer, finalizing the sstable
+ *   writer.close();
+ * </pre>
+ *
+ * Please note that {@code StressCQLSSTableWriter} is <b>not</b> thread-safe (multiple threads cannot access the
+ * same instance). It is however safe to use multiple instances in parallel (even if those instance write
+ * sstables for the same table).
+ */
+public class StressCQLSSTableWriter implements Closeable
+{
+    public static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization(false);
+        // Partitioner is not set in client mode.
+        if (DatabaseDescriptor.getPartitioner() == null)
+            DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+    private final AbstractSSTableSimpleWriter writer;
+    private final UpdateStatement insert;
+    private final List<ColumnSpecification> boundNames;
+    private final List<TypeCodec> typeCodecs;
+    private final ColumnFamilyStore cfs;
+
+    private StressCQLSSTableWriter(ColumnFamilyStore cfs, AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames)
+    {
+        this.cfs = cfs;
+        this.writer = writer;
+        this.insert = insert;
+        this.boundNames = boundNames;
+        this.typeCodecs = boundNames.stream().map(bn ->  UDHelper.codecFor(UDHelper.driverType(bn.type)))
+                                             .collect(Collectors.toList());
+    }
+
+    /**
+     * Returns a new builder for a StressCQLSSTableWriter.
+     *
+     * @return the new builder.
+     */
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * This is a shortcut for {@code addRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer).
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter addRow(Object... values)
+    throws InvalidRequestException, IOException
+    {
+        return addRow(Arrays.asList(values));
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * Each provided value type should correspond to the types of the CQL column
+     * the value is for. The correspondance between java type and CQL type is the
+     * same one than the one documented at
+     * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass().
+     * <p>
+     * If you prefer providing the values directly as binary, use
+     * {@link #rawAddRow} instead.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer).
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter addRow(List<Object> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+        {
+            Object value = values.get(i);
+            rawValues.add(serialize(value, typeCodecs.get(i)));
+        }
+
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * This is equivalent to the other addRow methods, but takes a map whose
+     * keys are the names of the columns to add instead of taking a list of the
+     * values in the order of the insert statement used during construction of
+     * this write.
+     * <p>
+     * Please note that the column names in the map keys must be in lowercase unless
+     * the declared column name is a
+     * <a href="http://cassandra.apache.org/doc/cql3/CQL.html#identifiers">case-sensitive quoted identifier</a>
+     * (in which case the map key must use the exact case of the column).
+     *
+     * @param values a map of colum name to column values representing the new
+     * row to add. Note that if a column is not part of the map, it's value will
+     * be {@code null}. If the map contains keys that does not correspond to one
+     * of the column of the insert statement used when creating this writer, the
+     * the corresponding value is ignored.
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter addRow(Map<String, Object> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = boundNames.size();
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+        for (int i = 0; i < size; i++)
+        {
+            ColumnSpecification spec = boundNames.get(i);
+            Object value = values.get(spec.name.toString());
+            rawValues.add(serialize(value, typeCodecs.get(i)));
+        }
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer) as binary.
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter rawAddRow(ByteBuffer... values)
+    throws InvalidRequestException, IOException
+    {
+        return rawAddRow(Arrays.asList(values));
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     * <p>
+     * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer) as binary.
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter rawAddRow(List<ByteBuffer> values)
+    throws InvalidRequestException, IOException
+    {
+        if (values.size() != boundNames.size())
+            throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
+
+        QueryOptions options = QueryOptions.forInternalCalls(null, values);
+        List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+        SortedSet<Clustering> clusterings = insert.createClustering(options);
+
+        long now = System.currentTimeMillis() * 1000;
+        // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open'
+        // and that forces a lot of initialization that we don't want.
+        UpdateParameters params = new UpdateParameters(insert.cfm,
+                                                       insert.updatedColumns(),
+                                                       options,
+                                                       insert.getTimestamp(now, options),
+                                                       insert.getTimeToLive(options),
+                                                       Collections.<DecoratedKey, Partition>emptyMap());
+
+        try
+        {
+            for (ByteBuffer key : keys)
+            {
+                for (Clustering clustering : clusterings)
+                    insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
+            }
+            return this;
+        }
+        catch (SSTableSimpleUnsortedWriter.SyncException e)
+        {
+            // If we use a BufferedWriter and had a problem writing to disk, the IOException has been
+            // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE.
+            throw (IOException)e.getCause();
+        }
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     * <p>
+     * This is equivalent to the other rawAddRow methods, but takes a map whose
+     * keys are the names of the columns to add instead of taking a list of the
+     * values in the order of the insert statement used during construction of
+     * this write.
+     *
+     * @param values a map of colum name to column values representing the new
+     * row to add. Note that if a column is not part of the map, it's value will
+     * be {@code null}. If the map contains keys that does not correspond to one
+     * of the column of the insert statement used when creating this writer, the
+     * the corresponding value is ignored.
+     * @return this writer.
+     */
+    public StressCQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) 
+        {
+            ColumnSpecification spec = boundNames.get(i);
+            rawValues.add(values.get(spec.name.toString()));
+        }
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Returns the User Defined type, used in this SSTable Writer, that can
+     * be used to create UDTValue instances.
+     *
+     * @param dataType name of the User Defined type
+     * @return user defined type
+     */
+    public com.datastax.driver.core.UserType getUDType(String dataType)
+    {
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(insert.keyspace());
+        UserType userType = ksm.types.getNullable(ByteBufferUtil.bytes(dataType));
+        return (com.datastax.driver.core.UserType) UDHelper.driverType(userType);
+    }
+
+    /**
+     * Close this writer.
+     * <p>
+     * This method should be called, otherwise the produced sstables are not
+     * guaranteed to be complete (and won't be in practice).
+     */
+    public void close() throws IOException
+    {
+        writer.close();
+    }
+
+    private ByteBuffer serialize(Object value, TypeCodec codec)
+    {
+        if (value == null || value == UNSET_VALUE)
+            return (ByteBuffer) value;
+
+        return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
+    }
+    /**
+     * The writer loads data in directories corresponding to how they laid out on the server.
+     * <p>
+     * {keyspace}/{table-cfid}/
+     *
+     * This method can be used to fetch the innermost directory with the sstable components
+     * @return The directory containing the sstable components
+     */
+    public File getInnermostDirectory()
+    {
+        return cfs.getDirectories().getDirectoryForNewSSTables();
+    }
+
+    /**
+     * A Builder for a StressCQLSSTableWriter object.
+     */
+    public static class Builder
+    {
+        private final List<File> directoryList;
+        private ColumnFamilyStore cfs;
+
+        protected SSTableFormat.Type formatType = null;
+
+        private Boolean makeRangeAware = false;
+
+        private CreateTableStatement.RawStatement schemaStatement;
+        private final List<CreateTypeStatement> typeStatements;
+        private UpdateStatement.ParsedInsert insertStatement;
+        private IPartitioner partitioner;
+
+        private boolean sorted = false;
+        private long bufferSizeInMB = 128;
+
+        protected Builder()
+        {
+            this.typeStatements = new ArrayList<>();
+            this.directoryList = new ArrayList<>();
+        }
+
+        /**
+         * The directory where to write the sstables.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param directory the directory to use, which should exists and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+         */
+        public Builder inDirectory(String directory)
+        {
+            return inDirectory(new File(directory));
+        }
+
+        /**
+         * The directory where to write the sstables (mandatory option).
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param directory the directory to use, which should exist and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+         */
+        public Builder inDirectory(File directory)
+        {
+            if (!directory.exists())
+                throw new IllegalArgumentException(directory + " doesn't exists");
+            if (!directory.canWrite())
+                throw new IllegalArgumentException(directory + " exists but is not writable");
+
+            directoryList.add(directory);
+            return this;
+        }
+
+        /**
+         * A pre-instanciated ColumnFamilyStore
+         * <p>
+         * This is can be used in place of inDirectory and forTable
+         *
+         * @see #inDirectory(File)
+         *
+         * @param cfs the list of directories to use, which should exist and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if a directory doesn't exist or is not writable.
+         */
+        public Builder withCfs(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+            return this;
+        }
+
+
+        public Builder withType(String typeDefinition) throws SyntaxException
+        {
+            typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE"));
+            return this;
+        }
+
+        /**
+         * The schema (CREATE TABLE statement) for the table for which sstable are to be created.
+         * <p>
+         * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified
+         * table name, one that include the keyspace name.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param schema the schema of the table for which sstables are to be created.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement
+         * or does not have a fully-qualified table name.
+         */
+        public Builder forTable(String schema)
+        {
+            this.schemaStatement = parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE");
+            return this;
+        }
+
+        /**
+         * The partitioner to use.
+         * <p>
+         * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used
+         * by the cluster for which the SSTables are created, you need to use this method to
+         * provide the correct partitioner.
+         *
+         * @param partitioner the partitioner to use.
+         * @return this builder.
+         */
+        public Builder withPartitioner(IPartitioner partitioner)
+        {
+            this.partitioner = partitioner;
+            return this;
+        }
+
+
+        /**
+         * Specify if the sstable writer should be vnode range aware.
+         * This will create a sstable per vnode range.
+         *
+         * @param makeRangeAware
+         * @return
+         */
+        public Builder rangeAware(boolean makeRangeAware)
+        {
+            this.makeRangeAware = makeRangeAware;
+            return this;
+        }
+
+        /**
+         * The INSERT statement defining the order of the values to add for a given CQL row.
+         * <p>
+         * Please note that the provided INSERT statement <b>must</b> use a fully-qualified
+         * table name, one that include the keyspace name. Morewover, said statement must use
+         * bind variables since it is those bind variables that will be bound to values by the
+         * resulting writer.
+         * <p>
+         * This is a mandatory option, and this needs to be called after foTable().
+         *
+         * @param insert an insertion statement that defines the order
+         * of column values to use.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion
+         * statement, does not have a fully-qualified table name or have no bind variables.
+         */
+        public Builder using(String insert)
+        {
+            this.insertStatement = parseStatement(insert, UpdateStatement.ParsedInsert.class, "INSERT");
+            return this;
+        }
+
+        /**
+         * The size of the buffer to use.
+         * <p>
+         * This defines how much data will be buffered before being written as
+         * a new SSTable. This correspond roughly to the data size that will have the created
+         * sstable.
+         * <p>
+         * The default is 128MB, which should be reasonable for a 1GB heap. If you experience
+         * OOM while using the writer, you should lower this value.
+         *
+         * @param size the size to use in MB.
+         * @return this builder.
+         */
+        public Builder withBufferSizeInMB(int size)
+        {
+            this.bufferSizeInMB = size;
+            return this;
+        }
+
+        /**
+         * Creates a StressCQLSSTableWriter that expects sorted inputs.
+         * <p>
+         * If this option is used, the resulting writer will expect rows to be
+         * added in SSTable sorted order (and an exception will be thrown if that
+         * is not the case during insertion). The SSTable sorted order means that
+         * rows are added such that their partition key respect the partitioner
+         * order.
+         * <p>
+         * You should thus only use this option is you know that you can provide
+         * the rows in order, which is rarely the case. If you can provide the
+         * rows in order however, using this sorted might be more efficient.
+         * <p>
+         * Note that if used, some option like withBufferSizeInMB will be ignored.
+         *
+         * @return this builder.
+         */
+        public Builder sorted()
+        {
+            this.sorted = true;
+            return this;
+        }
+
+        @SuppressWarnings("resource")
+        public StressCQLSSTableWriter build()
+        {
+            if (directoryList.isEmpty() && cfs == null)
+                throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()");
+            if (schemaStatement == null && cfs == null)
+                throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
+            if (insertStatement == null)
+                throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
+
+            synchronized (StressCQLSSTableWriter.class)
+            {
+                if (cfs == null)
+                    cfs = createOfflineTable(schemaStatement, typeStatements, directoryList);
+
+                if (partitioner == null)
+                    partitioner = cfs.getPartitioner();
+
+                Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert();
+                AbstractSSTableSimpleWriter writer = sorted
+                                                     ? new SSTableSimpleWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns())
+                                                     : new SSTableSimpleUnsortedWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns(), bufferSizeInMB);
+
+                if (formatType != null)
+                    writer.setSSTableFormatType(formatType);
+
+                writer.setRangeAwareWriting(makeRangeAware);
+
+                return new StressCQLSSTableWriter(cfs, writer, preparedInsert.left, preparedInsert.right);
+            }
+        }
+
+        private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements)
+        {
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            Types.RawBuilder builder = Types.rawBuilder(keyspace);
+            for (CreateTypeStatement st : typeStatements)
+                st.addToRawBuilder(builder);
+
+            ksm = ksm.withSwapped(builder.build());
+            Schema.instance.setKeyspaceMetadata(ksm);
+        }
+
+        public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList)
+        {
+            return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList);
+        }
+
+        /**
+         * Creates the table according to schema statement
+         * with specified data directories
+         */
+        public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList)
+        {
+            String keyspace = schemaStatement.keyspace();
+
+            if (Schema.instance.getKSMetaData(keyspace) == null)
+                Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+
+            createTypes(keyspace, typeStatements);
+
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+
+            CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
+            assert cfMetaData == null;
+
+            CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
+            statement.validate(ClientState.forInternalCalls());
+
+            //Build metatdata with a portable cfId
+            cfMetaData = statement.metadataBuilder()
+                                  .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
+                                  .build()
+                                  .params(statement.params());
+
+            Keyspace.setInitialized();
+            Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+
+            Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
+            ColumnFamilyStore cfs =  ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true);
+
+            ks.initCfCustom(cfs);
+            Schema.instance.load(cfs.metadata);
+            Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
+
+            return cfs;
+        }
+
+        /**
+         * Prepares insert statement for writing data to SSTable
+         *
+         * @return prepared Insert statement and it's bound names
+         */
+        private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert()
+        {
+            ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
+            UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
+            insert.validate(ClientState.forInternalCalls());
+
+            if (insert.hasConditions())
+                throw new IllegalArgumentException("Conditional statements are not supported");
+            if (insert.isCounter())
+                throw new IllegalArgumentException("Counter update statements are not supported");
+            if (cqlStatement.boundNames.isEmpty())
+                throw new IllegalArgumentException("Provided insert statement has no bind variables");
+
+            return Pair.create(insert, cqlStatement.boundNames);
+        }
+    }
+
+    public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
+    {
+        try
+        {
+            ParsedStatement stmt = QueryProcessor.parseStatement(query);
+
+            if (!stmt.getClass().equals(klass))
+                throw new IllegalArgumentException("Invalid query, must be a " + type + " statement but was: " + stmt.getClass());
+
+            return klass.cast(stmt);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
index 664f8d2..4180524 100644
--- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
+++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
@@ -24,39 +24,32 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.stream.Collectors;
 import javax.inject.Inject;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import io.airlift.command.*;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.StressCQLSSTableWriter;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.tools.Util;
 import org.apache.cassandra.tools.nodetool.CompactionStats;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -79,6 +72,11 @@ public abstract class CompactionStress implements Runnable
     @Option(name = {"-v", "--vnodes"}, description = "number of local tokens to generate (default 256)")
     Integer numTokens = 256;
 
+    static
+    {
+        DatabaseDescriptor.toolInitialization();
+    }
+
     List<File> getDataDirectories()
     {
         List<File> dataDirectories = new ArrayList<>(dataDirs.size());
@@ -112,14 +110,12 @@ public abstract class CompactionStress implements Runnable
 
     ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables)
     {
-        Util.initDatabaseDescriptor();
-
         generateTokens(stressProfile.seedStr, StorageService.instance.getTokenMetadata(), numTokens);
 
         CreateTableStatement.RawStatement createStatement = stressProfile.getCreateStatement();
         List<File> dataDirectories = getDataDirectories();
 
-        ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories);
+        ColumnFamilyStore cfs = StressCQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories);
 
         if (loadSSTables)
         {
@@ -302,7 +298,7 @@ public abstract class CompactionStress implements Runnable
             {
                 //Every thread needs it's own writer
                 final SchemaInsert insert = stressProfile.getOfflineInsert(null, generator, seedManager, settings);
-                final CQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware);
+                final StressCQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware);
                 executorService.submit(() -> {
                     try
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index b45462f..4fb70c6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -62,7 +62,7 @@ import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 import org.yaml.snakeyaml.error.YAMLException;
 
-import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement;
+import static org.apache.cassandra.io.sstable.StressCQLSSTableWriter.parseStatement;
 
 public class StressProfile implements Serializable
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index 96b3392..2c717a1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -21,26 +21,18 @@ package org.apache.cassandra.stress.operations.userdefined;
  */
 
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
-import com.google.common.util.concurrent.Uninterruptibles;
-
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.StressCQLSSTableWriter;
 import org.apache.cassandra.stress.WorkManager;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.report.Timer;
@@ -142,9 +134,9 @@ public class SchemaInsert extends SchemaStatement
 
     private class OfflineRun extends Runner
     {
-        final CQLSSTableWriter writer;
+        final StressCQLSSTableWriter writer;
 
-        OfflineRun(CQLSSTableWriter writer)
+        OfflineRun(StressCQLSSTableWriter writer)
         {
             this.writer = writer;
         }
@@ -182,9 +174,9 @@ public class SchemaInsert extends SchemaStatement
         timeWithRetry(new ThriftRun(client));
     }
 
-    public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware)
+    public StressCQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware)
     {
-        return CQLSSTableWriter.builder()
+        return StressCQLSSTableWriter.builder()
                                .withCfs(cfs)
                                .withBufferSizeInMB(bufferSize)
                                .forTable(tableSchema)
@@ -193,7 +185,7 @@ public class SchemaInsert extends SchemaStatement
                                .build();
     }
 
-    public void runOffline(CQLSSTableWriter writer, WorkManager workManager) throws Exception
+    public void runOffline(StressCQLSSTableWriter writer, WorkManager workManager) throws Exception
     {
         OfflineRun offline = new OfflineRun(writer);
 


Mime
View raw message