cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [11/23] Introduce CQL support for stress tool
Date Mon, 07 Jul 2014 17:34:34 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
deleted file mode 100644
index 021c4e8..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public final class ThriftRangeSlicer extends Operation
-{
-
-    public ThriftRangeSlicer(State state, long index)
-    {
-        super(state, index);
-    }
-
-    @Override
-    public void run(final ThriftClient client) throws IOException
-    {
-        final SlicePredicate predicate = new SlicePredicate()
-                .setSlice_range(
-                        new SliceRange(
-                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                false,
-                                state.settings.columns.maxColumnsPerKey
-                        )
-                );
-
-        final ByteBuffer start = getKey();
-        final KeyRange range =
-                new KeyRange(state.settings.columns.maxColumnsPerKey)
-                        .setStart_key(start)
-                        .setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-                        .setCount(state.settings.command.keysAtOnce);
-
-        for (final ColumnParent parent : state.columnParents)
-        {
-            timeWithRetry(new RunOp()
-            {
-                private int count = 0;
-                @Override
-                public boolean run() throws Exception
-                {
-                    return (count = client.get_range_slices(parent, predicate, range, state.settings.command.consistencyLevel).size()) != 0;
-                }
-
-                @Override
-                public String key()
-                {
-                    return new String(range.bufferForStart_key().array());
-                }
-
-                @Override
-                public int keyCount()
-                {
-                    return count;
-                }
-            });
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
deleted file mode 100644
index dccf469..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SuperColumn;
-
-public final class ThriftReader extends Operation
-{
-
-    public ThriftReader(State state, long index)
-    {
-        super(state, index);
-    }
-
-    public void run(final ThriftClient client) throws IOException
-    {
-        final SlicePredicate predicate = slicePredicate();
-        final ByteBuffer key = getKey();
-        final List<ByteBuffer> expect = state.rowGen.isDeterministic() ? generateColumnValues(key) : null;
-        for (final ColumnParent parent : state.columnParents)
-        {
-            timeWithRetry(new RunOp()
-            {
-                @Override
-                public boolean run() throws Exception
-                {
-                    List<ColumnOrSuperColumn> row = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel);
-                    if (expect == null)
-                        return !row.isEmpty();
-                    if (row == null)
-                        return false;
-                    if (!state.settings.columns.useSuperColumns)
-                    {
-                        if (row.size() != expect.size())
-                            return false;
-                        for (int i = 0 ; i < row.size() ; i++)
-                            if (!row.get(i).getColumn().bufferForValue().equals(expect.get(i)))
-                                return false;
-                    }
-                    else
-                    {
-                        for (ColumnOrSuperColumn col : row)
-                        {
-                            SuperColumn superColumn = col.getSuper_column();
-                            if (superColumn.getColumns().size() != expect.size())
-                                return false;
-                            for (int i = 0 ; i < expect.size() ; i++)
-                                if (!superColumn.getColumns().get(i).bufferForValue().equals(expect.get(i)))
-                                    return false;
-                        }
-                    }
-                    return true;
-                }
-
-                @Override
-                public String key()
-                {
-                    return new String(key.array());
-                }
-
-                @Override
-                public int keyCount()
-                {
-                    return 1;
-                }
-            });
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
new file mode 100644
index 0000000..f794e75
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public class CqlCounterAdder extends CqlOperation<Integer>
+{
+
+    final Distribution counteradd;
+    public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.COUNTER_WRITE, timer, generator, settings);
+        this.counteradd = counteradd.get();
+    }
+
+    @Override
+    protected String buildQuery()
+    {
+        String counterCF = isCql2() ? type.table : "Counter3";
+
+        StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
+
+        if (isCql2())
+            query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+        query.append(" SET ");
+
+        // TODO : increment distribution subset of columns
+        for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
+        {
+            if (i > 0)
+                query.append(",");
+
+            query.append('C').append(i).append("=C").append(i).append("+?");
+        }
+        query.append(" WHERE KEY=?");
+        return query.toString();
+    }
+
+    @Override
+    protected List<Object> getQueryParameters(byte[] key)
+    {
+        final List<Object> list = new ArrayList<>();
+        for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
+            list.add(counteradd.next());
+        list.add(ByteBuffer.wrap(key));
+        return list;
+    }
+
+    @Override
+    protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+    {
+        return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
new file mode 100644
index 0000000..94c8faf
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -0,0 +1,74 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public class CqlCounterGetter extends CqlOperation<Integer>
+{
+
+    public CqlCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.COUNTER_READ, timer, generator, settings);
+    }
+
+    @Override
+    protected List<Object> getQueryParameters(byte[] key)
+    {
+        return Collections.<Object>singletonList(ByteBuffer.wrap(key));
+    }
+
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("SELECT ");
+
+        // TODO: obey slice/noslice option (instead of always slicing)
+        if (isCql2())
+            query.append("FIRST ").append(settings.columns.maxColumnsPerKey).append(" ''..''");
+        else
+            query.append("*");
+
+        String counterCF = isCql2() ? type.table : "Counter3";
+
+        query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
+
+        if (isCql2())
+            query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+        return query.append(" WHERE KEY=?").toString();
+    }
+
+    @Override
+    protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+    {
+        return new CqlRunOpTestNonEmpty(client, query, queryId, params, key);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
new file mode 100644
index 0000000..c422f2b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -0,0 +1,79 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class CqlInserter extends CqlOperation<Integer>
+{
+
+    public CqlInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.WRITE, timer, generator, settings);
+    }
+
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(type.table));
+
+        if (isCql2())
+            query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+        query.append(" SET ");
+
+        for (int i = 0 ; i < settings.columns.maxColumnsPerKey ; i++)
+        {
+            if (i > 0)
+                query.append(',');
+
+            query.append(wrapInQuotesIfRequired(settings.columns.namestrs.get(i))).append(" = ?");
+        }
+
+        query.append(" WHERE KEY=?");
+        return query.toString();
+    }
+
+    @Override
+    protected List<Object> getQueryParameters(byte[] key)
+    {
+        final ArrayList<Object> queryParams = new ArrayList<>();
+        List<ByteBuffer> values = getColumnValues();
+        queryParams.addAll(values);
+        queryParams.add(ByteBuffer.wrap(key));
+        return queryParams;
+    }
+
+    @Override
+    protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+    {
+        return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
new file mode 100644
index 0000000..0264cd1
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressMetrics;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.ConnectionStyle;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public abstract class CqlOperation<V> extends PredefinedOperation
+{
+
+    protected abstract List<Object> getQueryParameters(byte[] key);
+    protected abstract String buildQuery();
+    protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key);
+
+    public CqlOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(type, timer, generator, settings);
+        if (settings.columns.variableColumnCount)
+            throw new IllegalStateException("Variable column counts are not implemented for CQL");
+    }
+
+    protected CqlRunOp<V> run(final ClientWrapper client, final List<Object> queryParams, final ByteBuffer key) throws IOException
+    {
+        final CqlRunOp<V> op;
+        if (settings.mode.style == ConnectionStyle.CQL_PREPARED)
+        {
+            final Object id;
+            Object idobj = getCqlCache();
+            if (idobj == null)
+            {
+                try
+                {
+                    id = client.createPreparedStatement(buildQuery());
+                } catch (TException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                storeCqlCache(id);
+            }
+            else
+                id = idobj;
+
+            op = buildRunOp(client, null, id, queryParams, key);
+        }
+        else
+        {
+            final String query;
+            Object qobj = getCqlCache();
+            if (qobj == null)
+                storeCqlCache(query = buildQuery());
+            else
+                query = qobj.toString();
+
+            op = buildRunOp(client, query, null, queryParams, key);
+        }
+
+        timeWithRetry(op);
+        return op;
+    }
+
+    protected void run(final ClientWrapper client) throws IOException
+    {
+        final byte[] key = getKey().array();
+        final List<Object> queryParams = getQueryParameters(key);
+        run(client, queryParams, ByteBuffer.wrap(key));
+    }
+
+    // Classes to process Cql results
+
+    // Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
+    protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
+    {
+
+        final int keyCount;
+
+        protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, int keyCount)
+        {
+            super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
+            this.keyCount = keyCount;
+        }
+
+        @Override
+        public boolean validate(Integer result)
+        {
+            return true;
+        }
+
+        @Override
+        public int partitionCount()
+        {
+            return keyCount;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return keyCount;
+        }
+    }
+
+    // Succeeds so long as the result set is nonempty, and the query executes without error
+    protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
+    {
+
+        protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+        {
+            super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
+        }
+
+        @Override
+        public boolean validate(Integer result)
+        {
+            return result > 0;
+        }
+
+        @Override
+        public int partitionCount()
+        {
+            return result;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return result;
+        }
+    }
+
+    // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
+    protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
+    {
+
+        protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+        {
+            super(client, query, queryId, KeysHandler.INSTANCE, params, key);
+        }
+
+        @Override
+        public int partitionCount()
+        {
+            return result.length;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return result.length;
+        }
+    }
+
+    protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
+    {
+
+        final List<List<ByteBuffer>> expect;
+
+        // a null value for an item in expect means we just check the row is present
+        protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, List<List<ByteBuffer>> expect)
+        {
+            super(client, query, queryId, RowsHandler.INSTANCE, params, key);
+            this.expect = expect;
+        }
+
+        @Override
+        public int partitionCount()
+        {
+            return result == null ? 0 : result.length;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return result == null ? 0 : result.length;
+        }
+
+        public boolean validate(ByteBuffer[][] result)
+        {
+            if (result.length != expect.size())
+                return false;
+            for (int i = 0 ; i < result.length ; i++)
+                if (expect.get(i) != null && !expect.get(i).equals(Arrays.asList(result[i])))
+                    return false;
+            return true;
+        }
+    }
+
+    // Cql
+    protected abstract class CqlRunOp<V> implements RunOp
+    {
+
+        final ClientWrapper client;
+        final String query;
+        final Object queryId;
+        final List<Object> params;
+        final ByteBuffer key;
+        final ResultHandler<V> handler;
+        V result;
+
+        private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<Object> params, ByteBuffer key)
+        {
+            this.client = client;
+            this.query = query;
+            this.queryId = queryId;
+            this.handler = handler;
+            this.params = params;
+            this.key = key;
+        }
+
+        @Override
+        public boolean run() throws Exception
+        {
+            return queryId != null
+            ? validate(result = client.execute(queryId, key, params, handler))
+            : validate(result = client.execute(query, key, params, handler));
+        }
+
+        public abstract boolean validate(V result);
+
+    }
+
+
+    /// LOTS OF WRAPPING/UNWRAPPING NONSENSE
+
+
+    @Override
+    public void run(final ThriftClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    @Override
+    public void run(SimpleClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    public ClientWrapper wrap(ThriftClient client)
+    {
+        return isCql3()
+                ? new Cql3CassandraClientWrapper(client)
+                : new Cql2CassandraClientWrapper(client);
+
+    }
+
+    public ClientWrapper wrap(JavaDriverClient client)
+    {
+        return new JavaDriverWrapper(client);
+    }
+
+    public ClientWrapper wrap(SimpleClient client)
+    {
+        return new SimpleClientWrapper(client);
+    }
+
+    protected interface ClientWrapper
+    {
+        Object createPreparedStatement(String cqlQuery) throws TException;
+        <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
+        <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
+    }
+
+    private final class JavaDriverWrapper implements ClientWrapper
+    {
+        final JavaDriverClient client;
+        private JavaDriverWrapper(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, isCql3());
+            return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        {
+            return handler.javaDriverHandler().apply(
+                    client.executePrepared(
+                            (PreparedStatement) preparedStatementId,
+                            queryParams,
+                            ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery)
+        {
+            return client.prepare(cqlQuery);
+        }
+    }
+
+    private final class SimpleClientWrapper implements ClientWrapper
+    {
+        final SimpleClient client;
+        private SimpleClientWrapper(SimpleClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, isCql3());
+            return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        {
+            return handler.thriftHandler().apply(
+                    client.executePrepared(
+                            (byte[]) preparedStatementId,
+                            toByteBufferParams(queryParams),
+                            ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery)
+        {
+            return client.prepare(cqlQuery).statementId.bytes;
+        }
+    }
+
+    // client wrapper for Cql3
+    private final class Cql3CassandraClientWrapper implements ClientWrapper
+    {
+        final ThriftClient client;
+        private Cql3CassandraClientWrapper(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, true);
+            return handler.simpleNativeHandler().apply(
+                    client.execute_cql3_query(formattedQuery, key, Compression.NONE, settings.command.consistencyLevel)
+            );
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+        {
+            Integer id = (Integer) preparedStatementId;
+            return handler.simpleNativeHandler().apply(
+                    client.execute_prepared_cql3_query(id, key, toByteBufferParams(queryParams), settings.command.consistencyLevel)
+            );
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery) throws TException
+        {
+            return client.prepare_cql3_query(cqlQuery, Compression.NONE);
+        }
+    }
+
+    // client wrapper for Cql2
+    private final class Cql2CassandraClientWrapper implements ClientWrapper
+    {
+        final ThriftClient client;
+        private Cql2CassandraClientWrapper(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, false);
+            return handler.simpleNativeHandler().apply(
+                    client.execute_cql_query(formattedQuery, key, Compression.NONE)
+            );
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+        {
+            Integer id = (Integer) preparedStatementId;
+            return handler.simpleNativeHandler().apply(
+                    client.execute_prepared_cql_query(id, key, toByteBufferParams(queryParams))
+            );
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery) throws TException
+        {
+            return client.prepare_cql_query(cqlQuery, Compression.NONE);
+        }
+    }
+
+    // interface for building functions to standardise results from each client
+    protected static interface ResultHandler<V>
+    {
+        Function<ResultSet, V> javaDriverHandler();
+        Function<ResultMessage, V> thriftHandler();
+        Function<CqlResult, V> simpleNativeHandler();
+    }
+
+    protected static class RowCountHandler implements ResultHandler<Integer>
+    {
+        static final RowCountHandler INSTANCE = new RowCountHandler();
+
+        @Override
+        public Function<ResultSet, Integer> javaDriverHandler()
+        {
+            return new Function<ResultSet, Integer>()
+            {
+                @Override
+                public Integer apply(ResultSet rows)
+                {
+                    if (rows == null)
+                        return 0;
+                    return rows.all().size();
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, Integer> thriftHandler()
+        {
+            return new Function<ResultMessage, Integer>()
+            {
+                @Override
+                public Integer apply(ResultMessage result)
+                {
+                    return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, Integer> simpleNativeHandler()
+        {
+            return new Function<CqlResult, Integer>()
+            {
+
+                @Override
+                public Integer apply(CqlResult result)
+                {
+                    switch (result.getType())
+                    {
+                        case ROWS:
+                            return result.getRows().size();
+                        default:
+                            return 1;
+                    }
+                }
+            };
+        }
+
+    }
+
+    // Processes results from each client into an array of all key bytes returned
+    protected static final class RowsHandler implements ResultHandler<ByteBuffer[][]>
+    {
+        static final RowsHandler INSTANCE = new RowsHandler();
+
+        @Override
+        public Function<ResultSet, ByteBuffer[][]> javaDriverHandler()
+        {
+            return new Function<ResultSet, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(ResultSet result)
+                {
+                    if (result == null)
+                        return new ByteBuffer[0][];
+                    List<Row> rows = result.all();
+
+                    ByteBuffer[][] r = new ByteBuffer[rows.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        Row row = rows.get(i);
+                        r[i] = new ByteBuffer[row.getColumnDefinitions().size()];
+                        for (int j = 0 ; j < row.getColumnDefinitions().size() ; j++)
+                            r[i][j] = row.getBytes(j);
+                    }
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, ByteBuffer[][]> thriftHandler()
+        {
+            return new Function<ResultMessage, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(ResultMessage result)
+                {
+                    if (!(result instanceof ResultMessage.Rows))
+                        return new ByteBuffer[0][];
+
+                    ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+                    ByteBuffer[][] r = new ByteBuffer[rows.result.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        List<ByteBuffer> row = rows.result.rows.get(i);
+                        r[i] = new ByteBuffer[row.size()];
+                        for (int j = 0 ; j < row.size() ; j++)
+                            r[i][j] = row.get(j);
+                    }
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler()
+        {
+            return new Function<CqlResult, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(CqlResult result)
+                {
+                    ByteBuffer[][] r = new ByteBuffer[result.getRows().size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        CqlRow row = result.getRows().get(i);
+                        r[i] = new ByteBuffer[row.getColumns().size()];
+                        for (int j = 0 ; j < r[i].length ; j++)
+                            r[i][j] = ByteBuffer.wrap(row.getColumns().get(j).getValue());
+                    }
+                    return r;
+                }
+            };
+        }
+
+    }
+    // Processes results from each client into an array of all key bytes returned
+    protected static final class KeysHandler implements ResultHandler<byte[][]>
+    {
+        static final KeysHandler INSTANCE = new KeysHandler();
+
+        @Override
+        public Function<ResultSet, byte[][]> javaDriverHandler()
+        {
+            return new Function<ResultSet, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(ResultSet result)
+                {
+
+                    if (result == null)
+                        return new byte[0][];
+                    List<Row> rows = result.all();
+                    byte[][] r = new byte[rows.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                        r[i] = rows.get(i).getBytes(0).array();
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, byte[][]> thriftHandler()
+        {
+            return new Function<ResultMessage, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(ResultMessage result)
+                {
+                    if (result instanceof ResultMessage.Rows)
+                    {
+                        ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+                        byte[][] r = new byte[rows.result.size()][];
+                        for (int i = 0 ; i < r.length ; i++)
+                            r[i] = rows.result.rows.get(i).get(0).array();
+                        return r;
+                    }
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, byte[][]> simpleNativeHandler()
+        {
+            return new Function<CqlResult, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(CqlResult result)
+                {
+                    byte[][] r = new byte[result.getRows().size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                        r[i] = result.getRows().get(i).getKey();
+                    return r;
+                }
+            };
+        }
+
+    }
+
+    private static String getUnQuotedCqlBlob(ByteBuffer term, boolean isCQL3)
+    {
+        return isCQL3
+                ? "0x" + ByteBufferUtil.bytesToHex(term)
+                : ByteBufferUtil.bytesToHex(term);
+    }
+
+    /**
+     * Constructs a CQL query string by replacing instances of the character
+     * '?', with the corresponding parameter.
+     *
+     * @param query base query string to format
+     * @param parms sequence of string query parameters
+     * @return formatted CQL query string
+     */
+    private static String formatCqlQuery(String query, List<Object> parms, boolean isCql3)
+    {
+        int marker, position = 0;
+        StringBuilder result = new StringBuilder();
+
+        if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+            return query;
+
+        for (Object parm : parms)
+        {
+            result.append(query.substring(position, marker));
+            if (parm instanceof ByteBuffer)
+                result.append(getUnQuotedCqlBlob((ByteBuffer) parm, isCql3));
+            else if (parm instanceof Long)
+                result.append(parm.toString());
+            else throw new AssertionError();
+
+            position = marker + 1;
+            if (-1 == (marker = query.indexOf('?', position + 1)))
+                break;
+        }
+
+        if (position < query.length())
+            result.append(query.substring(position));
+
+        return result.toString();
+    }
+
+    private static List<ByteBuffer> toByteBufferParams(List<Object> params)
+    {
+        List<ByteBuffer> r = new ArrayList<>();
+        for (Object param : params)
+        {
+            if (param instanceof ByteBuffer)
+                r.add((ByteBuffer) param);
+            else if (param instanceof Long)
+                r.add(ByteBufferUtil.bytes((Long) param));
+            else throw new AssertionError();
+        }
+        return r;
+    }
+
+    protected String wrapInQuotesIfRequired(String string)
+    {
+        return settings.mode.cqlVersion == CqlVersion.CQL3
+                ? "\"" + string + "\""
+                : string;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
new file mode 100644
index 0000000..3a7f75a
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -0,0 +1,87 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CqlReader extends CqlOperation<ByteBuffer[][]>
+{
+
+    public CqlReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.READ, timer, generator, settings);
+    }
+
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("SELECT ");
+
+        if (settings.columns.slice)
+        {
+            if (isCql2())
+                query.append("FIRST ").append(settings.columns.maxColumnsPerKey).append(" ''..''");
+            else
+                query.append("*");
+        }
+        else
+        {
+            for (int i = 0; i < settings.columns.maxColumnsPerKey ; i++)
+            {
+                if (i > 0)
+                    query.append(",");
+                query.append(wrapInQuotesIfRequired(settings.columns.namestrs.get(i)));
+            }
+        }
+
+        query.append(" FROM ").append(wrapInQuotesIfRequired(type.table));
+
+        if (isCql2())
+            query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+        query.append(" WHERE KEY=?");
+        return query.toString();
+    }
+
+    @Override
+    protected List<Object> getQueryParameters(byte[] key)
+    {
+        return Collections.<Object>singletonList(ByteBuffer.wrap(key));
+    }
+
+    @Override
+    protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+    {
+        List<ByteBuffer> expectRow = getColumnValues();
+        return new CqlRunOpMatchResults(client, query, queryId, params, key, Arrays.asList(expectRow));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
new file mode 100644
index 0000000..7f6412b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressMetrics;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.DistributionFixed;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class PredefinedOperation extends Operation
+{
+    public final Command type;
+    private final Distribution columnCount;
+    private Object cqlCache;
+
+    public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(timer, generator, settings, new DistributionFixed(1));
+        this.type = type;
+        this.columnCount = settings.columns.countDistribution.get();
+    }
+
+    public boolean isCql3()
+    {
+        return settings.mode.cqlVersion == CqlVersion.CQL3;
+    }
+    public boolean isCql2()
+    {
+        return settings.mode.cqlVersion == CqlVersion.CQL2;
+    }
+    public Object getCqlCache()
+    {
+        return cqlCache;
+    }
+    public void storeCqlCache(Object val)
+    {
+        cqlCache = val;
+    }
+
+    protected ByteBuffer getKey()
+    {
+        return (ByteBuffer) partitions.get(0).getPartitionKey(0);
+    }
+
+    final class ColumnSelection
+    {
+        final int[] indices;
+        final int lb, ub;
+        private ColumnSelection(int[] indices, int lb, int ub)
+        {
+            this.indices = indices;
+            this.lb = lb;
+            this.ub = ub;
+        }
+
+        public <V> List<V> select(List<V> in)
+        {
+            List<V> out = new ArrayList<>();
+            if (indices != null)
+            {
+                for (int i : indices)
+                    out.add(in.get(i));
+            }
+            else
+            {
+                out.addAll(in.subList(lb, ub));
+            }
+            return out;
+        }
+
+        int count()
+        {
+            return indices != null ? indices.length : ub - lb;
+        }
+
+        SlicePredicate predicate()
+        {
+            final SlicePredicate predicate = new SlicePredicate();
+            if (indices == null)
+            {
+                predicate.setSlice_range(new SliceRange()
+                                         .setStart(settings.columns.names.get(lb))
+                                         .setFinish(new byte[] {})
+                                         .setReversed(false)
+                                         .setCount(count())
+                );
+            }
+            else
+                predicate.setColumn_names(select(settings.columns.names));
+            return predicate;
+
+        }
+    }
+
+    public String toString()
+    {
+        return type.toString();
+    }
+
+    ColumnSelection select()
+    {
+        if (settings.columns.slice)
+        {
+            int count = (int) columnCount.next();
+            int start;
+            if (count == settings.columns.maxColumnsPerKey)
+                start = 0;
+            else
+                start = 1 + ThreadLocalRandom.current().nextInt(settings.columns.maxColumnsPerKey - count);
+            return new ColumnSelection(null, start, start + count);
+        }
+
+        int count = (int) columnCount.next();
+        int totalCount = settings.columns.names.size();
+        if (count == settings.columns.names.size())
+            return new ColumnSelection(null, 0, count);
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        int[] indices = new int[count];
+        int c = 0, o = 0;
+        while (c < count && count + o < totalCount)
+        {
+            int leeway = totalCount - (count + o);
+            int spreadover = count - c;
+            o += Math.round(rnd.nextDouble() * (leeway / (double) spreadover));
+            indices[c] = o + c;
+            c++;
+        }
+        while (c < count)
+        {
+            indices[c] = o + c;
+            c++;
+        }
+        return new ColumnSelection(indices, 0, 0);
+    }
+
+    protected List<ByteBuffer> getColumnValues()
+    {
+        return getColumnValues(new ColumnSelection(null, 0, settings.columns.names.size()));
+    }
+
+    protected List<ByteBuffer> getColumnValues(ColumnSelection columns)
+    {
+        Row row = partitions.get(0).iterator(1).batch(1f).iterator().next();
+        ByteBuffer[] r = new ByteBuffer[columns.count()];
+        int c = 0;
+        if (columns.indices != null)
+            for (int i : columns.indices)
+                r[c++] = (ByteBuffer) row.get(i);
+        else
+            for (int i = columns.lb ; i < columns.ub ; i++)
+                r[c++] = (ByteBuffer) row.get(i);
+        return Arrays.asList(r);
+    }
+
+    public static Operation operation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings, DistributionFactory counteradd)
+    {
+        switch (type)
+        {
+            case READ:
+                switch(settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftReader(timer, generator, settings);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlReader(timer, generator, settings);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+
+            case COUNTER_READ:
+                switch(settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftCounterGetter(timer, generator, settings);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlCounterGetter(timer, generator, settings);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+            case WRITE:
+
+                switch(settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftInserter(timer, generator, settings);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlInserter(timer, generator, settings);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+            case COUNTER_WRITE:
+                switch(settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftCounterAdder(counteradd, timer, generator, settings);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlCounterAdder(counteradd, timer, generator, settings);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+        }
+
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
new file mode 100644
index 0000000..ee766c3
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.*;
+
+public class ThriftCounterAdder extends PredefinedOperation
+{
+
+    final Distribution counteradd;
+    public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.COUNTER_WRITE, timer, generator, settings);
+        this.counteradd = counteradd.get();
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        List<CounterColumn> columns = new ArrayList<>();
+        for (ByteBuffer name : select().select(settings.columns.names))
+            columns.add(new CounterColumn(name, counteradd.next()));
+
+        List<Mutation> mutations = new ArrayList<>(columns.size());
+        for (CounterColumn c : columns)
+        {
+            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
+            mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+        }
+        Map<String, List<Mutation>> row = Collections.singletonMap(type.table, mutations);
+
+        final ByteBuffer key = getKey();
+        final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                client.batch_mutate(record, settings.command.consistencyLevel);
+                return true;
+            }
+
+            @Override
+            public int partitionCount()
+            {
+                return 1;
+            }
+
+            @Override
+            public int rowCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
new file mode 100644
index 0000000..10c6aab
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+
+public class ThriftCounterGetter extends PredefinedOperation
+{
+    public ThriftCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.COUNTER_READ, timer, generator, settings);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        final SlicePredicate predicate = select().predicate();
+        final ByteBuffer key = getKey();
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                List<?> r = client.get_slice(key, new ColumnParent(type.table), predicate, settings.command.consistencyLevel);
+                return r != null && r.size() > 0;
+            }
+
+            @Override
+            public int partitionCount()
+            {
+                return 1;
+            }
+
+            @Override
+            public int rowCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
new file mode 100644
index 0000000..5c2acfe
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class ThriftInserter extends PredefinedOperation
+{
+
+    public ThriftInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.WRITE, timer, generator, settings);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        final ByteBuffer key = getKey();
+        final List<Column> columns = getColumns();
+
+        List<Mutation> mutations = new ArrayList<>(columns.size());
+        for (Column c : columns)
+        {
+            ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
+            mutations.add(new Mutation().setColumn_or_supercolumn(column));
+        }
+        Map<String, List<Mutation>> row = Collections.singletonMap(type.table, mutations);
+
+        final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                client.batch_mutate(record, settings.command.consistencyLevel);
+                return true;
+            }
+
+            @Override
+            public int partitionCount()
+            {
+                return 1;
+            }
+
+            @Override
+            public int rowCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+    protected List<Column> getColumns()
+    {
+        final ColumnSelection selection = select();
+        final List<ByteBuffer> values = getColumnValues(selection);
+        final List<Column> columns = new ArrayList<>(values.size());
+        final List<ByteBuffer> names = select().select(settings.columns.names);
+        for (int i = 0 ; i < values.size() ; i++)
+            columns.add(new Column(names.get(i))
+                        .setValue(values.get(i))
+                        .setTimestamp(FBUtilities.timestampMicros()));
+        return columns;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
new file mode 100644
index 0000000..276d8c5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SuperColumn;
+
+public final class ThriftReader extends PredefinedOperation
+{
+
+    public ThriftReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        super(Command.READ, timer, generator, settings);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        final ColumnSelection select = select();
+        final ByteBuffer key = getKey();
+        final List<ByteBuffer> expect = getColumnValues(select);
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                List<ColumnOrSuperColumn> row = client.get_slice(key, new ColumnParent(type.table), select.predicate(), settings.command.consistencyLevel);
+                if (expect == null)
+                    return !row.isEmpty();
+                if (row == null)
+                    return false;
+                if (row.size() != expect.size())
+                    return false;
+                for (int i = 0 ; i < row.size() ; i++)
+                    if (!row.get(i).getColumn().bufferForValue().equals(expect.get(i)))
+                        return false;
+                return true;
+            }
+
+            @Override
+            public int partitionCount()
+            {
+                return 1;
+            }
+
+            @Override
+            public int rowCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
new file mode 100644
index 0000000..7c5efac
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -0,0 +1,144 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.RatioDistribution;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+
+public class SchemaInsert extends SchemaStatement
+{
+
+    private final BatchStatement.Type batchType;
+    private final RatioDistribution perVisit;
+    private final RatioDistribution perBatch;
+
+    public SchemaInsert(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount, RatioDistribution perVisit, RatioDistribution perBatch, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
+    {
+        super(timer, generator, settings, partitionCount, statement, thriftId, cl, ValidationType.NOT_FAIL);
+        this.batchType = batchType;
+        this.perVisit = perVisit;
+        this.perBatch = perBatch;
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
+            for (int i = 0 ; i < iterators.length ; i++)
+                iterators[i] = partitions.get(i).iterator(perVisit.next());
+            List<BoundStatement> stmts = new ArrayList<>();
+            partitionCount = partitions.size();
+
+            boolean done;
+            do
+            {
+                done = true;
+                stmts.clear();
+                for (Partition.RowIterator iterator : iterators)
+                {
+                    if (iterator.done())
+                        continue;
+
+                    for (Row row : iterator.batch(perBatch.next()))
+                        stmts.add(bindRow(row));
+
+                    done &= iterator.done();
+                }
+
+                rowCount += stmts.size();
+
+                Statement stmt;
+                if (stmts.size() == 1)
+                {
+                    stmt = stmts.get(0);
+                }
+                else
+                {
+                    BatchStatement batch = new BatchStatement(batchType);
+                    batch.setConsistencyLevel(JavaDriverClient.from(cl));
+                    batch.addAll(stmts);
+                    stmt = batch;
+                }
+                validate(client.getSession().execute(stmt));
+
+            } while (!done);
+
+            return true;
+        }
+    }
+
+    private class ThriftRun extends Runner
+    {
+        final ThriftClient client;
+
+        private ThriftRun(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
+            for (int i = 0 ; i < iterators.length ; i++)
+                iterators[i] = partitions.get(i).iterator(perVisit.next());
+            partitionCount = partitions.size();
+
+            boolean done;
+            do
+            {
+                done = true;
+                for (Partition.RowIterator iterator : iterators)
+                {
+                    if (iterator.done())
+                        continue;
+
+                    for (Row row : iterator.batch(perBatch.next()))
+                    {
+                        validate(client.execute_prepared_cql3_query(thriftId, iterator.partition().getToken(), thriftRowArgs(row), settings.command.consistencyLevel));
+                        rowCount += 1;
+                    }
+
+                    done &= iterator.done();
+                }
+            } while (!done);
+
+            return true;
+        }
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client));
+    }
+
+    @Override
+    public void run(ThriftClient client) throws IOException
+    {
+        timeWithRetry(new ThriftRun(client));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
new file mode 100644
index 0000000..9cec39b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -0,0 +1,86 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.OptionDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+
+public class SchemaQuery extends SchemaStatement
+{
+
+    public SchemaQuery(Timer timer, PartitionGenerator generator, StressSettings settings, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ValidationType validationType)
+    {
+        super(timer, generator, settings, OptionDistribution.get("fixed(1)").get(), statement, thriftId, cl, validationType);
+    }
+
+    int execute(JavaDriverClient client) throws Exception
+    {
+        return client.getSession().execute(bindRandom(partitions.get(0))).all().size();
+    }
+
+    int execute(ThriftClient client) throws Exception
+    {
+        return client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl)).getRowsSize();
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            ResultSet rs = client.getSession().execute(bindRandom(partitions.get(0)));
+            validate(rs);
+            rowCount = rs.all().size();
+            partitionCount = Math.min(1, rowCount);
+            return true;
+        }
+    }
+
+    private class ThriftRun extends Runner
+    {
+        final ThriftClient client;
+
+        private ThriftRun(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            CqlResult rs = client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl));
+            validate(rs);
+            rowCount = rs.getRowsSize();
+            partitionCount = Math.min(1, rowCount);
+            return true;
+        }
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client));
+    }
+
+    @Override
+    public void run(ThriftClient client) throws IOException
+    {
+        timeWithRetry(new ThriftRun(client));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
new file mode 100644
index 0000000..aac40c5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -0,0 +1,164 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.transport.SimpleClient;
+
+public abstract class SchemaStatement extends Operation
+{
+
+    final PartitionGenerator generator;
+    private final PreparedStatement statement;
+    final Integer thriftId;
+    final ConsistencyLevel cl;
+    final ValidationType validationType;
+    private final int[] argumentIndex;
+    private final Object[] bindBuffer;
+    private final Object[][] randomBuffer;
+    private final Random random = new Random();
+
+    public SchemaStatement(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount,
+                           PreparedStatement statement, Integer thriftId, ConsistencyLevel cl, ValidationType validationType)
+    {
+        super(timer, generator, settings, partitionCount);
+        this.generator = generator;
+        this.statement = statement;
+        this.thriftId = thriftId;
+        this.cl = cl;
+        this.validationType = validationType;
+        argumentIndex = new int[statement.getVariables().size()];
+        bindBuffer = new Object[argumentIndex.length];
+        randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
+        int i = 0;
+        for (ColumnDefinitions.Definition definition : statement.getVariables())
+            argumentIndex[i++] = generator.indexOf(definition.getName());
+    }
+
+    private int filLRandom(Partition partition)
+    {
+        int c = 0;
+        for (Row row : partition.iterator(randomBuffer.length).batch(1f))
+        {
+            Object[] randomRow = randomBuffer[c++];
+            for (int i = 0 ; i < argumentIndex.length ; i++)
+                randomRow[i] = row.get(argumentIndex[i]);
+            if (c >= randomBuffer.length)
+                break;
+        }
+        return c;
+    }
+
+    BoundStatement bindRandom(Partition partition)
+    {
+        int c = filLRandom(partition);
+        for (int i = 0 ; i < argumentIndex.length ; i++)
+        {
+            int argIndex = argumentIndex[i];
+            bindBuffer[i] = randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i];
+        }
+        return statement.bind(bindBuffer);
+    }
+
+    BoundStatement bindRow(Row row)
+    {
+        for (int i = 0 ; i < argumentIndex.length ; i++)
+            bindBuffer[i] = row.get(argumentIndex[i]);
+        return statement.bind(bindBuffer);
+    }
+
+    List<ByteBuffer> thriftRowArgs(Row row)
+    {
+        List<ByteBuffer> args = new ArrayList<>();
+        for (int i : argumentIndex)
+            args.add(generator.convert(i, row.get(i)));
+        return args;
+    }
+
+    List<ByteBuffer> thriftRandomArgs(Partition partition)
+    {
+        List<ByteBuffer> args = new ArrayList<>();
+        int c = filLRandom(partition);
+        for (int i = 0 ; i < argumentIndex.length ; i++)
+        {
+            int argIndex = argumentIndex[i];
+            args.add(generator.convert(argIndex, randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i]));
+        }
+        return args;
+    }
+
+    void validate(ResultSet rs)
+    {
+        switch (validationType)
+        {
+            case NOT_FAIL:
+                return;
+            case NON_ZERO:
+                if (rs.all().size() == 0)
+                    throw new IllegalStateException("Expected non-zero results");
+                break;
+            default:
+                throw new IllegalStateException("Unsupported validation type");
+        }
+    }
+
+    void validate(CqlResult rs)
+    {
+        switch (validationType)
+        {
+            case NOT_FAIL:
+                return;
+            case NON_ZERO:
+                if (rs.getRowsSize() == 0)
+                    throw new IllegalStateException("Expected non-zero results");
+                break;
+            default:
+                throw new IllegalStateException("Unsupported validation type");
+        }
+    }
+
+    @Override
+    public void run(SimpleClient client) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    abstract class Runner implements RunOp
+    {
+        int partitionCount;
+        int rowCount;
+
+        @Override
+        public int partitionCount()
+        {
+            return partitionCount;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return rowCount;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
index ac10014..7138cbb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
@@ -41,19 +41,6 @@ public enum Command
             "Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test",
             CommandCategory.MIXED
     ),
-    RANGE_SLICE(false, "Standard1", "Super1",
-            "Range slice queries - the cluster must first be populated by a write test",
-            CommandCategory.MULTI
-    ),
-    INDEXED_RANGE_SLICE(false, "Standard1", "Super1",
-            "Range slice queries through a secondary index. The cluster must first be populated by a write test, with indexing enabled.",
-            CommandCategory.BASIC
-    ),
-    READ_MULTI(false, "Standard1", "Super1",
-            "multi_read",
-            "Multiple concurrent reads fetching multiple rows at once. The cluster must first be populated by a write test.",
-            CommandCategory.MULTI
-    ),
     COUNTER_WRITE(true, "Counter1", "SuperCounter1",
             "counter_add",
             "Multiple concurrent updates of counters.",
@@ -64,6 +51,10 @@ public enum Command
             "Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.",
             CommandCategory.BASIC
     ),
+    USER(true, null, null,
+          "Interleaving of user provided queries, with configurable ratio and distribution",
+          CommandCategory.USER
+    ),
 
     HELP(false, null, null, "-?", "Print help for a command or option", null),
     PRINT(false, null, null, "Inspect the output of a distribution definition", null),
@@ -136,11 +127,12 @@ public enum Command
         }
         switch (category)
         {
+            case USER:
+                return SettingsCommandUser.helpPrinter();
             case BASIC:
-            case MULTI:
-                return SettingsCommand.helpPrinter(this);
+                return SettingsCommandPreDefined.helpPrinter(this);
             case MIXED:
-                return SettingsCommandMixed.helpPrinter();
+                return SettingsCommandPreDefinedMixed.helpPrinter();
         }
         throw new AssertionError();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java b/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
index 4372f59..e9dd946 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
@@ -24,6 +24,6 @@ package org.apache.cassandra.stress.settings;
 public enum CommandCategory
 {
     BASIC,
-    MULTI,
-    MIXED
+    MIXED,
+    USER
 }


Mime
View raw message