cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [4/6] Improve Stress Tool patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6199
Date Tue, 24 Dec 2013 02:08:43 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
new file mode 100644
index 0000000..1f734be
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.ConnectionStyle;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public abstract class CqlOperation<V> extends Operation
+{
+
+    protected abstract List<ByteBuffer> getQueryParameters(byte[] key);
+    protected abstract String buildQuery();
+    protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key);
+
+    public CqlOperation(State state, long idx)
+    {
+        super(state, idx);
+        if (state.settings.columns.useSuperColumns)
+            throw new IllegalStateException("Super columns are not implemented for CQL");
+        if (state.settings.columns.variableColumnCount)
+            throw new IllegalStateException("Variable column counts are not implemented for CQL");
+    }
+
+    protected CqlRunOp<V> run(final ClientWrapper client, final List<ByteBuffer> queryParams, final ByteBuffer key, final String keyid) throws IOException
+    {
+        final CqlRunOp<V> op;
+        if (state.settings.mode.style == ConnectionStyle.CQL_PREPARED)
+        {
+            final Object id;
+            Object idobj = state.getCqlCache();
+            if (idobj == null)
+            {
+                try
+                {
+                    id = client.createPreparedStatement(buildQuery());
+                } catch (TException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                state.storeCqlCache(id);
+            }
+            else
+                id = idobj;
+
+            op = buildRunOp(client, null, id, queryParams, keyid, key);
+        }
+        else
+        {
+            final String query;
+            Object qobj = state.getCqlCache();
+            if (qobj == null)
+                state.storeCqlCache(query = buildQuery());
+            else
+                query = qobj.toString();
+
+            op = buildRunOp(client, query, null, queryParams, keyid, key);
+        }
+
+        timeWithRetry(op);
+        return op;
+    }
+
+    protected void run(final ClientWrapper client) throws IOException
+    {
+        final byte[] key = getKey().array();
+        final List<ByteBuffer> queryParams = getQueryParameters(key);
+        run(client, queryParams, ByteBuffer.wrap(key), new String(key));
+    }
+
+    // Classes to process Cql results
+
+    // Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
+    protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
+    {
+
+        final int keyCount;
+
+        protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, int keyCount)
+        {
+            super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
+            this.keyCount = keyCount;
+        }
+
+        @Override
+        public boolean validate(Integer result)
+        {
+            return true;
+        }
+
+        @Override
+        public int keyCount()
+        {
+            return keyCount;
+        }
+    }
+
+    // Succeeds so long as the result set is nonempty, and the query executes without error
+    protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
+    {
+
+        protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+        {
+            super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
+        }
+
+        @Override
+        public boolean validate(Integer result)
+        {
+            return true;
+        }
+
+        @Override
+        public int keyCount()
+        {
+            return result;
+        }
+    }
+
+    // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
+    protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
+    {
+
+        protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+        {
+            super(client, query, queryId, KeysHandler.INSTANCE, params, id, key);
+        }
+
+        @Override
+        public int keyCount()
+        {
+            return result.length;
+        }
+
+    }
+
+    // Cql
+    protected abstract class CqlRunOp<V> implements RunOp
+    {
+
+        final ClientWrapper client;
+        final String query;
+        final Object queryId;
+        final List<ByteBuffer> params;
+        final String id;
+        final ByteBuffer key;
+        final ResultHandler<V> handler;
+        V result;
+
+        private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<ByteBuffer> params, String id, ByteBuffer key)
+        {
+            this.client = client;
+            this.query = query;
+            this.queryId = queryId;
+            this.handler = handler;
+            this.params = params;
+            this.id = id;
+            this.key = key;
+        }
+
+        @Override
+        public boolean run() throws Exception
+        {
+            return queryId != null
+            ? validate(result = client.execute(queryId, key, params, handler))
+            : validate(result = client.execute(query, key, params, handler));
+        }
+
+        @Override
+        public String key()
+        {
+            return id;
+        }
+
+        public abstract boolean validate(V result);
+
+    }
+
+
+    /// LOTS OF WRAPPING/UNWRAPPING NONSENSE
+
+
+    @Override
+    public void run(final ThriftClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    @Override
+    public void run(SimpleClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        run(wrap(client));
+    }
+
+    public ClientWrapper wrap(ThriftClient client)
+    {
+        return state.isCql3()
+                ? new Cql3CassandraClientWrapper(client)
+                : new Cql2CassandraClientWrapper(client);
+
+    }
+
+    public ClientWrapper wrap(JavaDriverClient client)
+    {
+        return new JavaDriverWrapper(client);
+    }
+
+    public ClientWrapper wrap(SimpleClient client)
+    {
+        return new SimpleClientWrapper(client);
+    }
+
+    protected interface ClientWrapper
+    {
+        Object createPreparedStatement(String cqlQuery) throws TException;
+        <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
+        <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
+    }
+
+    private final class JavaDriverWrapper implements ClientWrapper
+    {
+        final JavaDriverClient client;
+        private JavaDriverWrapper(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
+            return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+        {
+            return handler.javaDriverHandler().apply(
+                    client.executePrepared(
+                            (PreparedStatement) preparedStatementId,
+                            queryParams,
+                            ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery)
+        {
+            return client.prepare(cqlQuery);
+        }
+    }
+
+    private final class SimpleClientWrapper implements ClientWrapper
+    {
+        final SimpleClient client;
+        private SimpleClientWrapper(SimpleClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
+            return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+        {
+            return handler.thriftHandler().apply(
+                    client.executePrepared(
+                            (byte[]) preparedStatementId,
+                            queryParams,
+                            ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery)
+        {
+            return client.prepare(cqlQuery).statementId.bytes;
+        }
+    }
+
+    // client wrapper for Cql3
+    private final class Cql3CassandraClientWrapper implements ClientWrapper
+    {
+        final ThriftClient client;
+        private Cql3CassandraClientWrapper(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, true);
+            return handler.simpleNativeHandler().apply(
+                    client.execute_cql3_query(query, key, Compression.NONE, state.settings.command.consistencyLevel)
+            );
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+        {
+            Integer id = (Integer) preparedStatementId;
+            return handler.simpleNativeHandler().apply(
+                    client.execute_prepared_cql3_query(id, key, queryParams, state.settings.command.consistencyLevel)
+            );
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery) throws TException
+        {
+            return client.prepare_cql3_query(cqlQuery, Compression.NONE);
+        }
+    }
+
+    // client wrapper for Cql2
+    private final class Cql2CassandraClientWrapper implements ClientWrapper
+    {
+        final ThriftClient client;
+        private Cql2CassandraClientWrapper(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        @Override
+        public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+        {
+            String formattedQuery = formatCqlQuery(query, queryParams, false);
+            return handler.simpleNativeHandler().apply(
+                    client.execute_cql_query(formattedQuery, key, Compression.NONE)
+            );
+        }
+
+        @Override
+        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+        {
+            Integer id = (Integer) preparedStatementId;
+            return handler.simpleNativeHandler().apply(
+                    client.execute_prepared_cql_query(id, key, queryParams)
+            );
+        }
+
+        @Override
+        public Object createPreparedStatement(String cqlQuery) throws TException
+        {
+            return client.prepare_cql_query(cqlQuery, Compression.NONE);
+        }
+    }
+
+    // interface for building functions to standardise results from each client
+    protected static interface ResultHandler<V>
+    {
+        Function<ResultSet, V> javaDriverHandler();
+        Function<ResultMessage, V> thriftHandler();
+        Function<CqlResult, V> simpleNativeHandler();
+    }
+
+    protected static class RowCountHandler implements ResultHandler<Integer>
+    {
+        static final RowCountHandler INSTANCE = new RowCountHandler();
+
+        @Override
+        public Function<ResultSet, Integer> javaDriverHandler()
+        {
+            return new Function<ResultSet, Integer>()
+            {
+                @Override
+                public Integer apply(ResultSet rows)
+                {
+                    if (rows == null)
+                        return 0;
+                    return rows.all().size();
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, Integer> thriftHandler()
+        {
+            return new Function<ResultMessage, Integer>()
+            {
+                @Override
+                public Integer apply(ResultMessage result)
+                {
+                    return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, Integer> simpleNativeHandler()
+        {
+            return new Function<CqlResult, Integer>()
+            {
+
+                @Override
+                public Integer apply(CqlResult result)
+                {
+                    switch (result.getType())
+                    {
+                        case ROWS:
+                            return result.getRows().size();
+                        default:
+                            return 1;
+                    }
+                }
+            };
+        }
+
+    }
+
+    // Processes results from each client into an array of all key bytes returned
+    protected static final class KeysHandler implements ResultHandler<byte[][]>
+    {
+        static final KeysHandler INSTANCE = new KeysHandler();
+
+        @Override
+        public Function<ResultSet, byte[][]> javaDriverHandler()
+        {
+            return new Function<ResultSet, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(ResultSet result)
+                {
+
+                    if (result == null)
+                        return new byte[0][];
+                    List<Row> rows = result.all();
+                    byte[][] r = new byte[rows.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                        r[i] = rows.get(i).getBytes(0).array();
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, byte[][]> thriftHandler()
+        {
+            return new Function<ResultMessage, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(ResultMessage result)
+                {
+                    if (result instanceof ResultMessage.Rows)
+                    {
+                        ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+                        byte[][] r = new byte[rows.result.size()][];
+                        for (int i = 0 ; i < r.length ; i++)
+                            r[i] = rows.result.rows.get(i).get(0).array();
+                        return r;
+                    }
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, byte[][]> simpleNativeHandler()
+        {
+            return new Function<CqlResult, byte[][]>()
+            {
+
+                @Override
+                public byte[][] apply(CqlResult result)
+                {
+                    byte[][] r = new byte[result.getRows().size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                        r[i] = result.getRows().get(i).getKey();
+                    return r;
+                }
+            };
+        }
+
+    }
+
+    private static String getUnQuotedCqlBlob(ByteBuffer term, boolean isCQL3)
+    {
+        return isCQL3
+                ? "0x" + ByteBufferUtil.bytesToHex(term)
+                : ByteBufferUtil.bytesToHex(term);
+    }
+
+    /**
+     * Constructs a CQL query string by replacing instances of the character
+     * '?', with the corresponding parameter.
+     *
+     * @param query base query string to format
+     * @param parms sequence of string query parameters
+     * @return formatted CQL query string
+     */
+    private static String formatCqlQuery(String query, List<ByteBuffer> parms, boolean isCql3)
+    {
+        int marker, position = 0;
+        StringBuilder result = new StringBuilder();
+
+        if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+            return query;
+
+        for (ByteBuffer parm : parms)
+        {
+            result.append(query.substring(position, marker));
+            result.append(getUnQuotedCqlBlob(parm, isCql3));
+
+            position = marker + 1;
+            if (-1 == (marker = query.indexOf('?', position + 1)))
+                break;
+        }
+
+        if (position < query.length())
+            result.append(query.substring(position));
+
+        return result.toString();
+    }
+
+    protected String wrapInQuotesIfRequired(String string)
+    {
+        return state.settings.mode.cqlVersion == CqlVersion.CQL3
+                ? "\"" + string + "\""
+                : string;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index c01767b..467e754 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -21,98 +21,39 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.transport.SimpleClient;
-
-public class CqlRangeSlicer extends CQLOperation
+public class CqlRangeSlicer extends CqlOperation<Integer>
 {
-    private static String cqlQuery = null;
-    private int lastRowCount;
-
-    public CqlRangeSlicer(Session client, int idx)
+    public CqlRangeSlicer(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
-
-        if (cqlQuery == null)
-        {
-            StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
-                    .append(" ''..'' FROM Standard1");
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
-            cqlQuery = query.append(" WHERE KEY > ?").toString();
-        }
-
-        String key = String.format("%0" +  session.getTotalKeysLength() + "d", index);
-        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
+        return Collections.singletonList(ByteBuffer.wrap(key));
+    }
 
-            try
-            {
-                success = executor.execute(cqlQuery, queryParams);
-            }
-            catch (Exception e)
-            {
-                System.err.println(e);
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("SELECT FIRST ").append(state.settings.columns.maxColumnsPerKey)
+                .append(" ''..'' FROM ").append(state.settings.schema.columnFamily);
 
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error executing range slice with offset %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                key,
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
 
-        session.operations.getAndIncrement();
-        session.keys.getAndAdd(lastRowCount);
-        context.stop();
+        return query.append(" WHERE KEY > ?").toString();
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        lastRowCount = result.rows.size();
-        return  lastRowCount != 0;
+        return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
-    {
-        assert result instanceof ResultMessage.Rows;
-        lastRowCount = ((ResultMessage.Rows)result).result.size();
-        return lastRowCount != 0;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 70273c1..051fd18 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -21,116 +21,67 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.ThriftConversion;
-
-public class CqlReader extends CQLOperation
+public class CqlReader extends CqlOperation<Integer>
 {
-    private static String cqlQuery = null;
 
-    public CqlReader(Session client, int idx)
+    public CqlReader(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected String buildQuery()
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
+        StringBuilder query = new StringBuilder("SELECT ");
 
-        if (cqlQuery == null)
+        if (state.settings.columns.names == null)
         {
-            StringBuilder query = new StringBuilder("SELECT ");
-
-            if (session.columnNames == null)
-            {
-                if (session.cqlVersion.startsWith("2"))
-                    query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
-                else
-                    query.append("*");
-            }
+            if (state.isCql2())
+                query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
             else
-            {
-                for (int i = 0; i < session.columnNames.size(); i++)
-                {
-                    if (i > 0) query.append(",");
-                    query.append('?');
-                }
-            }
-
-            query.append(" FROM ").append(wrapInQuotesIfRequired("Standard1"));
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-            query.append(" WHERE KEY=?");
-
-            cqlQuery = query.toString();
+                query.append("*");
         }
-
-        List<String> queryParams = new ArrayList<String>();
-        if (session.columnNames != null)
-            for (int i = 0; i < session.columnNames.size(); i++)
-                queryParams.add(getUnQuotedCqlBlob(session.columnNames.get(i).array(), session.cqlVersion.startsWith("3")));
-
-        byte[] key = generateKey();
-        queryParams.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
+        else
         {
-            if (success)
-                break;
-
-            try
-            {
-                success = executor.execute(cqlQuery, queryParams);
-            }
-            catch (Exception e)
+            for (int i = 0; i < state.settings.columns.names.size() ; i++)
             {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
+                if (i > 0)
+                    query.append(",");
+                query.append('?');
             }
         }
 
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error reading key %s %s%n with query %s",
-                                index,
-                                session.getRetryTimes(),
-                                new String(key),
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")",
-                                cqlQuery));
-        }
+        query.append(" FROM ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
 
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
+        query.append(" WHERE KEY=?");
+        return query.toString();
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        return result.rows.get(0).columns.size() != 0;
+        if (state.settings.columns.names != null)
+        {
+            final List<ByteBuffer> queryParams = new ArrayList<>();
+            for (ByteBuffer name : state.settings.columns.names)
+                queryParams.add(name);
+            queryParams.add(ByteBuffer.wrap(key));
+            return queryParams;
+        }
+        return Collections.singletonList(ByteBuffer.wrap(key));
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
+    @Override
+    protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
+        return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
deleted file mode 100644
index b7c72a2..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class IndexedRangeSlicer extends Operation
-{
-    private static List<ByteBuffer> values = null;
-
-    public IndexedRangeSlicer(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        if (values == null)
-            values = generateValues();
-
-        String format = "%0" + session.getTotalKeysLength() + "d";
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      false, session.getColumnsPerKey()));
-
-        ColumnParent parent = new ColumnParent("Standard1");
-        int expectedPerValue = session.getNumKeys() / values.size();
-
-        ByteBuffer columnName = ByteBufferUtil.bytes("C1");
-
-        int received = 0;
-
-        String startOffset = String.format(format, 0);
-        ByteBuffer value = values.get(1); // only C1 column is indexed
-
-        IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
-
-        while (received < expectedPerValue)
-        {
-            IndexClause clause = new IndexClause(Arrays.asList(expression),
-                                                 ByteBufferUtil.bytes(startOffset),
-                                                 session.getKeysPerCall());
-
-            List<KeySlice> results = null;
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
-                    success = (results.size() != 0);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    startOffset,
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            received += results.size();
-
-            // convert max key found back to an integer, and increment it
-            startOffset = String.format(format, (1 + getMaxKey(results)));
-
-            session.operations.getAndIncrement();
-            session.keys.getAndAdd(results.size());
-            context.stop();
-        }
-    }
-
-    /**
-     * Get maximum key from keySlice list
-     * @param keySlices list of the KeySlice objects
-     * @return maximum key value of the list
-     */
-    private int getMaxKey(List<KeySlice> keySlices)
-    {
-        byte[] firstKey = keySlices.get(0).getKey();
-        int maxKey = ByteBufferUtil.toInt(ByteBuffer.wrap(firstKey));
-
-        for (KeySlice k : keySlices)
-        {
-            int currentKey = ByteBufferUtil.toInt(ByteBuffer.wrap(k.getKey()));
-
-            if (currentKey > maxKey)
-            {
-                maxKey = currentKey;
-            }
-        }
-
-        return maxKey;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
deleted file mode 100644
index cbf6b98..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class Inserter extends Operation
-{
-    private static List<ByteBuffer> values;
-
-    public Inserter(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        if (values == null)
-            values = generateValues();
-
-        List<Column> columns = new ArrayList<Column>(session.getColumnsPerKey());
-        List<SuperColumn> superColumns = null;
-
-        // format used for keys
-        String format = "%0" + session.getTotalKeysLength() + "d";
-
-        for (int i = 0; i < session.getColumnsPerKey(); i++)
-        {
-            columns.add(new Column(columnName(i, session.timeUUIDComparator))
-                            .setValue(values.get(i % values.size()))
-                            .setTimestamp(FBUtilities.timestampMicros()));
-        }
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            superColumns = new ArrayList<SuperColumn>();
-            // supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
-            for (int i = 0; i < session.getSuperColumns(); i++)
-            {
-                String superColumnName = "S" + Integer.toString(i);
-                superColumns.add(new SuperColumn(ByteBufferUtil.bytes(superColumnName), columns));
-            }
-        }
-
-        String rawKey = String.format(format, index);
-        Map<String, List<Mutation>> row = session.getColumnFamilyType() == ColumnFamilyType.Super
-                                        ? getSuperColumnsMutationMap(superColumns)
-                                        : getColumnsMutationMap(columns);
-        Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(ByteBufferUtil.bytes(rawKey), row);
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
-
-            try
-            {
-                client.batch_mutate(record, session.getConsistencyLevel());
-                success = true;
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
-
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                rawKey,
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
-    }
-
-    private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)
-    {
-        List<Mutation> mutations = new ArrayList<Mutation>(superColumns.size());
-        for (SuperColumn s : superColumns)
-        {
-            ColumnOrSuperColumn superColumn = new ColumnOrSuperColumn().setSuper_column(s);
-            mutations.add(new Mutation().setColumn_or_supercolumn(superColumn));
-        }
-
-        return Collections.singletonMap("Super1", mutations);
-    }
-
-    private Map<String, List<Mutation>> getColumnsMutationMap(List<Column> columns)
-    {
-        List<Mutation> mutations = new ArrayList<Mutation>(columns.size());
-        for (Column c : columns)
-        {
-            ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
-            mutations.add(new Mutation().setColumn_or_supercolumn(column));
-        }
-
-        return Collections.singletonMap("Standard1", mutations);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
deleted file mode 100644
index 12a39fb..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class MultiGetter extends Operation
-{
-    public MultiGetter(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      false, session.getColumnsPerKey()));
-
-        int offset = index * session.getKeysPerThread();
-        Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
-            for (int j = 0; j < session.getSuperColumns(); j++)
-            {
-                ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes("S" + j));
-
-                TimerContext context = session.latency.time();
-
-                boolean success = false;
-                String exceptionMessage = null;
-
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    if (success)
-                        break;
-
-                    try
-                    {
-                        results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
-                        success = (results.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                    }
-                }
-
-                if (!success)
-                {
-                    error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                        index,
-                                        session.getRetryTimes(),
-                                        keys,
-                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-                }
-
-                session.operations.getAndIncrement();
-                session.keys.getAndAdd(keys.size());
-                context.stop();
-
-                offset += session.getKeysPerCall();
-            }
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
-
-            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
-                    success = (results.size() != 0);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    keys,
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            session.operations.getAndIncrement();
-            session.keys.getAndAdd(keys.size());
-            context.stop();
-
-            offset += session.getKeysPerCall();
-        }
-    }
-
-    private List<ByteBuffer> generateKeys(int start, int limit)
-    {
-        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-
-        for (int i = start; i < limit; i++)
-        {
-            keys.add(ByteBuffer.wrap(generateKey()));
-        }
-
-        return keys;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
deleted file mode 100644
index f9ba115..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class RangeSlicer extends Operation
-{
-
-    public RangeSlicer(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        String format = "%0" + session.getTotalKeysLength() + "d";
-
-        // initial values
-        int count = session.getColumnsPerKey();
-
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                                      false,
-                                                                                      count));
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            ByteBuffer start = ByteBufferUtil.bytes(String.format(format, index));
-
-            List<KeySlice> slices = new ArrayList<KeySlice>();
-            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
-            for (int i = 0; i < session.getSuperColumns(); i++)
-            {
-                String superColumnName = "S" + Integer.toString(i);
-                ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes(superColumnName));
-
-                TimerContext context = session.latency.time();
-
-                boolean success = false;
-                String exceptionMessage = null;
-
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    try
-                    {
-                        slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
-                        success = (slices.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                        success = false;
-                    }
-                }
-
-                if (!success)
-                {
-                    error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
-                                        index,
-                                        session.getRetryTimes(),
-                                        ByteBufferUtil.string(start),
-                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-                }
-
-                session.operations.getAndIncrement();
-                context.stop();
-            }
-
-            session.keys.getAndAdd(slices.size());
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
-
-            ByteBuffer start = ByteBufferUtil.bytes(String.format(format, index));
-
-            List<KeySlice> slices = new ArrayList<KeySlice>();
-            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
-                    success = (slices.size() != 0);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    ByteBufferUtil.string(start),
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            session.operations.getAndIncrement();
-            session.keys.getAndAdd(slices.size());
-            context.stop();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
deleted file mode 100644
index 72d09b4..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-public class Reader extends Operation
-{
-    public Reader(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        // initialize SlicePredicate with existing SliceRange
-        SlicePredicate predicate = new SlicePredicate();
-
-        if (session.columnNames == null)
-            predicate.setSlice_range(getSliceRange());
-        else // see CASSANDRA-3064 about why this is useful
-            predicate.setColumn_names(session.columnNames);
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            runSuperColumnReader(predicate, client);
-        }
-        else
-        {
-            runColumnReader(predicate, client);
-        }
-    }
-
-    private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
-    {
-        byte[] rawKey = generateKey();
-        ByteBuffer key = ByteBuffer.wrap(rawKey);
-
-        for (int j = 0; j < session.getSuperColumns(); j++)
-        {
-            String superColumn = 'S' + Integer.toString(j);
-            ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes(UTF_8));
-
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    List<ColumnOrSuperColumn> columns;
-                    columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
-                    success = (columns.size() != 0);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    new String(rawKey),
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            session.operations.getAndIncrement();
-            session.keys.getAndIncrement();
-            context.stop();
-        }
-    }
-
-    private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
-    {
-        ColumnParent parent = new ColumnParent("Standard1");
-
-        byte[] key = generateKey();
-        ByteBuffer keyBuffer = ByteBuffer.wrap(key);
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
-
-            try
-            {
-                List<ColumnOrSuperColumn> columns;
-                columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
-                success = (columns.size() != 0);
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
-
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                new String(key),
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
-    }
-
-    private SliceRange getSliceRange()
-    {
-        return new SliceRange()
-                    .setStart(new byte[] {})
-                    .setFinish(new byte[] {})
-                    .setReversed(false)
-                    .setCount(session.getColumnsPerKey());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
new file mode 100644
index 0000000..b1657b2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+
+public class ThriftCounterAdder extends Operation
+{
+    public ThriftCounterAdder(State state, long index)
+    {
+        super(state, index);
+        if (state.settings.columns.variableColumnCount)
+            throw new IllegalStateException("Variable column counts not supported for counters");
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        List<CounterColumn> columns = new ArrayList<CounterColumn>();
+        for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
+            columns.add(new CounterColumn(getColumnNameBytes(i), 1L));
+
+        Map<String, List<Mutation>> row;
+        if (state.settings.columns.useSuperColumns)
+        {
+            List<Mutation> mutations = new ArrayList<>();
+            for (ColumnParent parent : state.columnParents)
+            {
+                CounterSuperColumn csc = new CounterSuperColumn(ByteBuffer.wrap(parent.getSuper_column()), columns);
+                ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(csc);
+                mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+            }
+            row = Collections.singletonMap("SuperCounter1", mutations);
+        }
+        else
+        {
+            List<Mutation> mutations = new ArrayList<>(columns.size());
+            for (CounterColumn c : columns)
+            {
+                ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
+                mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+            }
+            row = Collections.singletonMap("Counter1", mutations);
+        }
+
+        final ByteBuffer key = getKey();
+        final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                client.batch_mutate(record, state.settings.command.consistencyLevel);
+                return true;
+            }
+
+            @Override
+            public String key()
+            {
+                return new String(key.array());
+            }
+
+            @Override
+            public int keyCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
new file mode 100644
index 0000000..8567edd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+
+public class ThriftCounterGetter extends Operation
+{
+    public ThriftCounterGetter(State state, long index)
+    {
+        super(state, index);
+        if (state.settings.columns.variableColumnCount)
+            throw new IllegalStateException("Variable column counts not supported for counters");
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        SliceRange sliceRange = new SliceRange();
+        // start/finish
+        sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
+        // reversed/count
+        sliceRange.setReversed(false).setCount(state.settings.columns.maxColumnsPerKey);
+        // initialize SlicePredicate with existing SliceRange
+        final SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange);
+
+        final ByteBuffer key = getKey();
+        for (final ColumnParent parent : state.columnParents)
+        {
+
+            timeWithRetry(new RunOp()
+            {
+                @Override
+                public boolean run() throws Exception
+                {
+                    return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0;
+                }
+
+                @Override
+                public String key()
+                {
+                    return new String(key.array());
+                }
+
+                @Override
+                public int keyCount()
+                {
+                    return 1;
+                }
+            });
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
new file mode 100644
index 0000000..c6b1b03
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ThriftIndexedRangeSlicer extends Operation
+{
+    public ThriftIndexedRangeSlicer(State state, long index)
+    {
+        super(state, index);
+        if (!state.rowGen.isDeterministic() || !state.keyGen.isDeterministic())
+            throw new IllegalStateException("Only run with a isDeterministic row/key generator");
+        if (state.settings.columns.useSuperColumns || state.columnParents.size() != 1)
+            throw new IllegalStateException("Does not support super columns");
+        if (state.settings.columns.useTimeUUIDComparator)
+            throw new IllegalStateException("Does not support TimeUUID column names");
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+
+        final SlicePredicate predicate = new SlicePredicate()
+                .setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        false, state.settings.columns.maxColumnsPerKey));
+        final List<ByteBuffer> columns = generateColumnValues();
+        final ColumnParent parent = state.columnParents.get(0);
+
+        final ByteBuffer columnName = getColumnNameBytes(1);
+        final ByteBuffer value = columns.get(1); // only C1 column is indexed
+
+        IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+        byte[] minKey = new byte[0];
+        final List<KeySlice>[] results = new List[1];
+        do
+        {
+
+            final boolean first = minKey.length == 0;
+            final IndexClause clause = new IndexClause(Arrays.asList(expression),
+                                                 ByteBuffer.wrap(minKey),
+                                                ((SettingsCommandMulti) state.settings.command).keysAtOnce);
+
+            timeWithRetry(new RunOp()
+            {
+                @Override
+                public boolean run() throws Exception
+                {
+                    results[0] = client.get_indexed_slices(parent, clause, predicate, state.settings.command.consistencyLevel);
+                    return !first || results[0].size() > 0;
+                }
+
+                @Override
+                public String key()
+                {
+                    return new String(value.array());
+                }
+
+                @Override
+                public int keyCount()
+                {
+                    return results[0].size();
+                }
+            });
+
+            minKey = getNextMinKey(minKey, results[0]);
+
+        } while (results[0].size() > 0);
+    }
+
+    /**
+     * Get maximum key from keySlice list
+     * @param slices list of the KeySlice objects
+     * @return maximum key value of the list
+     */
+    private static byte[] getNextMinKey(byte[] cur, List<KeySlice> slices)
+    {
+        // find max
+        for (KeySlice slice : slices)
+            if (FBUtilities.compareUnsigned(cur, slice.getKey()) < 0)
+                cur = slice.getKey();
+
+        // increment
+        for (int i = 0 ; i < cur.length ; i++)
+            if (++cur[i] != 0)
+                break;
+        return cur;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
new file mode 100644
index 0000000..c5f8051
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class ThriftInserter extends Operation
+{
+
+    public ThriftInserter(State state, long index)
+    {
+        super(state, index);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        final ByteBuffer key = getKey();
+        final List<Column> columns = generateColumns();
+
+        Map<String, List<Mutation>> row;
+        if (!state.settings.columns.useSuperColumns)
+        {
+            List<Mutation> mutations = new ArrayList<>(columns.size());
+            for (Column c : columns)
+            {
+                ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
+                mutations.add(new Mutation().setColumn_or_supercolumn(column));
+            }
+            row = Collections.singletonMap(state.settings.schema.columnFamily, mutations);
+        }
+        else
+        {
+            List<Mutation> mutations = new ArrayList<>(state.columnParents.size());
+            for (ColumnParent parent : state.columnParents)
+            {
+                final SuperColumn s = new SuperColumn(parent.bufferForSuper_column(), columns);
+                final ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setSuper_column(s);
+                mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+            }
+            row = Collections.singletonMap("Super1", mutations);
+        }
+
+        final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+        timeWithRetry(new RunOp()
+        {
+            @Override
+            public boolean run() throws Exception
+            {
+                client.batch_mutate(record, state.settings.command.consistencyLevel);
+                return true;
+            }
+
+            @Override
+            public String key()
+            {
+                return new String(key.array());
+            }
+
+            @Override
+            public int keyCount()
+            {
+                return 1;
+            }
+        });
+    }
+
+    protected List<Column> generateColumns()
+    {
+        final List<ByteBuffer> values = generateColumnValues();
+        final List<Column> columns = new ArrayList<>(values.size());
+
+        if (state.settings.columns.useTimeUUIDComparator)
+            for (int i = 0 ; i < values.size() ; i++)
+                new Column(TimeUUIDType.instance.decompose(UUIDGen.getTimeUUID()));
+        else
+            // TODO : consider randomly allocating column names in case where have fewer than max columns
+            // but need to think about implications for indexes / indexed range slicer / other knock on effects
+            for (int i = 0 ; i < values.size() ; i++)
+                columns.add(new Column(getColumnNameBytes(i)));
+
+        for (int i = 0 ; i < values.size() ; i++)
+            columns.get(i)
+                    .setValue(values.get(i))
+                    .setTimestamp(FBUtilities.timestampMicros());
+
+        return columns;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
new file mode 100644
index 0000000..01c7325
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public final class ThriftMultiGetter extends Operation
+{
+
+    public ThriftMultiGetter(State state, long index)
+    {
+        super(state, index);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+
+        final SlicePredicate predicate = new SlicePredicate().setSlice_range(
+                new SliceRange(
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        false,
+                        state.settings.columns.maxColumnsPerKey
+                )
+        );
+
+        final List<ByteBuffer> keys = getKeys(((SettingsCommandMulti) state.settings.command).keysAtOnce);
+
+        for (final ColumnParent parent : state.columnParents)
+        {
+            timeWithRetry(new RunOp()
+            {
+                int count;
+                @Override
+                public boolean run() throws Exception
+                {
+                    return (count = client.multiget_slice(keys, parent, predicate, state.settings.command.consistencyLevel).size()) != 0;
+                }
+
+                @Override
+                public String key()
+                {
+                    return keys.toString();
+                }
+
+                @Override
+                public int keyCount()
+                {
+                    return count;
+                }
+            });
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
new file mode 100644
index 0000000..ce6c8cd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public final class ThriftRangeSlicer extends Operation
+{
+
+    public ThriftRangeSlicer(State state, long index)
+    {
+        super(state, index);
+    }
+
+    @Override
+    public void run(final ThriftClient client) throws IOException
+    {
+        final SlicePredicate predicate = new SlicePredicate()
+                .setSlice_range(
+                        new SliceRange(
+                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                false,
+                                state.settings.columns.maxColumnsPerKey
+                        )
+                );
+
+        final ByteBuffer start = getKey();
+        final KeyRange range =
+                new KeyRange(state.settings.columns.maxColumnsPerKey)
+                        .setStart_key(start)
+                        .setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                        .setCount(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+
+        for (final ColumnParent parent : state.columnParents)
+        {
+            timeWithRetry(new RunOp()
+            {
+                private int count = 0;
+                @Override
+                public boolean run() throws Exception
+                {
+                    return (count = client.get_range_slices(parent, predicate, range, state.settings.command.consistencyLevel).size()) != 0;
+                }
+
+                @Override
+                public String key()
+                {
+                    return new String(range.bufferForStart_key().array());
+                }
+
+                @Override
+                public int keyCount()
+                {
+                    return count;
+                }
+            });
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
new file mode 100644
index 0000000..a8605e8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+
+public final class ThriftReader extends Operation
+{
+
+    public ThriftReader(State state, long index)
+    {
+        super(state, index);
+    }
+
+    public void run(final ThriftClient client) throws IOException
+    {
+        final SlicePredicate predicate = new SlicePredicate();
+        if (state.settings.columns.names == null)
+            predicate.setSlice_range(new SliceRange()
+                    .setStart(new byte[] {})
+                    .setFinish(new byte[] {})
+                    .setReversed(false)
+                    .setCount(state.settings.columns.maxColumnsPerKey)
+            );
+        else // see CASSANDRA-3064 about why this is useful
+            predicate.setColumn_names(state.settings.columns.names);
+
+        final ByteBuffer key = getKey();
+        for (final ColumnParent parent : state.columnParents)
+        {
+            timeWithRetry(new RunOp()
+            {
+                @Override
+                public boolean run() throws Exception
+                {
+                    return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0;
+                }
+
+                @Override
+                public String key()
+                {
+                    return new String(key.array());
+                }
+
+                @Override
+                public int keyCount()
+                {
+                    return 1;
+                }
+            });
+        }
+    }
+
+}


Mime
View raw message