cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [5/6] Improve Stress Tool patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6199
Date Tue, 24 Dec 2013 02:08:44 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
new file mode 100644
index 0000000..b9f1a47
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -0,0 +1,178 @@
+package org.apache.cassandra.stress;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.stress.util.Timing;
+import org.apache.cassandra.stress.util.TimingInterval;
+import org.apache.cassandra.stress.util.Uncertainty;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+
+public class StressMetrics
+{
+
+    private static final ThreadFactory tf = new NamedThreadFactory("StressMetrics");
+
+    private final PrintStream output;
+    private final Thread thread;
+    private volatile boolean stop = false;
+    private volatile boolean cancelled = false;
+    private final Uncertainty opRateUncertainty = new Uncertainty();
+    private final CountDownLatch stopped = new CountDownLatch(1);
+    private final Timing timing = new Timing();
+
+    public StressMetrics(PrintStream output, final long logIntervalMillis)
+    {
+        this.output = output;
+        printHeader("", output);
+        thread = tf.newThread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                timing.start();
+                try {
+
+                    while (!stop)
+                    {
+                        try
+                        {
+                            long sleep = timing.getHistory().endMillis() + logIntervalMillis - System.currentTimeMillis();
+                            if (sleep < logIntervalMillis >>> 3)
+                                // if had a major hiccup, sleep full interval
+                                Thread.sleep(logIntervalMillis);
+                            else
+                                Thread.sleep(sleep);
+                            update();
+                        } catch (InterruptedException e)
+                        {
+                            break;
+                        }
+                    }
+
+                    update();
+                }
+                catch (InterruptedException e)
+                {}
+                catch (Exception e)
+                {
+                    cancel();
+                    e.printStackTrace(StressMetrics.this.output);
+                }
+                finally
+                {
+                    stopped.countDown();
+                }
+            }
+        });
+    }
+
+    public void start()
+    {
+        thread.start();
+    }
+
+    public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
+    {
+        opRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
+    }
+
+    public void cancel()
+    {
+        cancelled = true;
+        stop = true;
+        thread.interrupt();
+        opRateUncertainty.wakeAll();
+    }
+
+    public void stop() throws InterruptedException
+    {
+        stop = true;
+        thread.interrupt();
+        stopped.await();
+    }
+
+    private void update() throws InterruptedException
+    {
+        TimingInterval interval = timing.snapInterval();
+        printRow("", interval, timing.getHistory(), opRateUncertainty, output);
+        opRateUncertainty.update(interval.adjustedOpRate());
+    }
+
+
+    // PRINT FORMATTING
+
+    public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
+    public static final String ROWFORMAT =  "%-10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
+
+    private static void printHeader(String prefix, PrintStream output)
+    {
+        output.println(prefix + String.format(HEADFORMAT, "ops","op/s", "adj op/s","key/s","mean","med",".95",".99",".999","max","time","stderr"));
+    }
+
+    private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
+    {
+        output.println(prefix + String.format(ROWFORMAT,
+                total.operationCount,
+                interval.realOpRate(),
+                interval.adjustedOpRate(),
+                interval.keyRate(),
+                interval.meanLatency(),
+                interval.medianLatency(),
+                interval.rankLatency(0.95f),
+                interval.rankLatency(0.99f),
+                interval.rankLatency(0.999f),
+                interval.maxLatency(),
+                total.runTime() / 1000f,
+                opRateUncertainty.getUncertainty()));
+    }
+
+    public void summarise()
+    {
+        output.println("\n");
+        output.println("Results:");
+        TimingInterval history = timing.getHistory();
+        output.println(String.format("real op rate              : %.0f", history.realOpRate()));
+        output.println(String.format("adjusted op rate          : %.0f", history.adjustedOpRate()));
+        output.println(String.format("adjusted op rate stderr   : %.0f", opRateUncertainty.getUncertainty()));
+        output.println(String.format("key rate                  : %.0f", history.keyRate()));
+        output.println(String.format("latency mean              : %.1f", history.meanLatency()));
+        output.println(String.format("latency median            : %.1f", history.medianLatency()));
+        output.println(String.format("latency 95th percentile   : %.1f", history.rankLatency(.95f)));
+        output.println(String.format("latency 99th percentile   : %.1f", history.rankLatency(0.99f)));
+        output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f)));
+        output.println(String.format("latency max               : %.1f", history.maxLatency()));
+        output.println("Total operation time      : " + DurationFormatUtils.formatDuration(
+                history.runTime(), "HH:mm:ss", true));
+    }
+
+    public static final void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+    {
+        int idLen = 0;
+        for (String id : ids)
+            idLen = Math.max(id.length(), idLen);
+        String formatstr = "%" + idLen + "s, ";
+        printHeader(String.format(formatstr, "id"), out);
+        for (int i = 0 ; i < ids.size() ; i++)
+            printRow(String.format(formatstr, ids.get(i)),
+                    summarise.get(i).timing.getHistory(),
+                    summarise.get(i).timing.getHistory(),
+                    summarise.get(i).opRateUncertainty,
+                    out
+            );
+    }
+
+    public Timing getTiming()
+    {
+        return timing;
+    }
+
+    public boolean wasCancelled()
+    {
+        return cancelled;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 6600dfd..3c9e2a6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -1,27 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.cassandra.stress;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
 
-import org.apache.cassandra.stress.server.StressThread;
+import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.commons.cli.*;
 
 public class StressServer
@@ -68,4 +71,57 @@ public class StressServer
         for (;;)
             new StressThread(serverSocket.accept()).start();
     }
+
+    public static class StressThread extends Thread
+    {
+        private final Socket socket;
+
+        public StressThread(Socket client)
+        {
+            this.socket = client;
+        }
+
+        public void run()
+        {
+            try
+            {
+                ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+                PrintStream out = new PrintStream(socket.getOutputStream());
+
+                StressAction action = new StressAction((StressSettings) in.readObject(), out);
+                Thread actionThread = new Thread(action);
+                actionThread.start();
+
+                while (actionThread.isAlive())
+                {
+                    try
+                    {
+                        if (in.readInt() == 1)
+                        {
+                            actionThread.interrupt();
+                            break;
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        // continue without problem
+                    }
+                }
+
+                out.close();
+                in.close();
+                socket.close();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+            catch (Exception e)
+            {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java b/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
deleted file mode 100644
index b739c8e..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.stress;
-
-import java.io.PrintStream;
-import org.apache.commons.lang3.time.DurationFormatUtils;
-
-import com.yammer.metrics.stats.Snapshot;
-
-
-/**
- * Gathers and aggregates statistics for an operation
- */
-public class StressStatistics
-{
-    
-    private Session client;
-    private PrintStream output;
-
-    private long durationInSeconds;
-    /** The sum of the interval_op_rate values collected by tallyAverages */
-    private int tallyOpRateSum;
-    /** The number of interval_op_rate values collected by tallyAverages */
-    private int tallyOpRateCount;
-    /** The sum of the interval_key_rate values collected by tallyAverages */
-    private int tallyKeyRateSum;
-    /** The number of interval_key_rate values collected by tallyAverages */
-    private int tallyKeyRateCount;
-
-    /** The sum of the latency values collected by tallyAverages */
-    private double tallyLatencySum;
-    /** The number of latency values collected by tallyAverages */
-    private int tallyLatencyCount;
-    /** The sum of the 95%tile latency values collected by tallyAverages */
-    private double tally95thLatencySum;
-    /** The number of 95%tile latency values collected by tallyAverages */
-    private int tally95thLatencyCount;
-    /** The sum of the 99.9%tile latency values collected by tallyAverages */
-    private double tally999thLatencySum;
-    /** The number of 99.9%tile latency values collected by tallyAverages */
-    private int tally999thLatencyCount;
-    
-
-    public StressStatistics(Session client, PrintStream out)
-    {
-        this.client = client;
-        this.output = out;
-
-        tallyOpRateSum = 0;
-        tallyOpRateCount = 0;
-    }
-
-    /**
-     * Collect statistics per-interval
-     */
-    public void addIntervalStats(int totalOperations, int intervalOpRate, 
-                                 int intervalKeyRate, Snapshot latency, 
-                                 long currentTimeInSeconds)
-    {
-        this.tallyAverages(totalOperations, intervalKeyRate, intervalKeyRate, 
-                                latency, currentTimeInSeconds);
-    }
-
-    /**
-     * Collect interval_op_rate and interval_key_rate averages
-     */
-    private void tallyAverages(int totalOperations, int intervalOpRate, 
-                                 int intervalKeyRate, Snapshot latency, 
-                                 long currentTimeInSeconds)
-    {
-        //Skip the first and last 10% of values.
-        //The middle values of the operation are the ones worthwhile
-        //to collect and average:
-        if (totalOperations > (0.10 * client.getNumKeys()) &&
-            totalOperations < (0.90 * client.getNumKeys())) {
-                tallyOpRateSum += intervalOpRate;
-                tallyOpRateCount += 1;
-                tallyKeyRateSum += intervalKeyRate;
-                tallyKeyRateCount += 1;
-                tallyLatencySum += latency.getMedian();
-                tallyLatencyCount += 1;
-                tally95thLatencySum += latency.get95thPercentile();
-                tally95thLatencyCount += 1;
-                tally999thLatencySum += latency.get999thPercentile();
-                tally999thLatencyCount += 1;
-            }
-        durationInSeconds = currentTimeInSeconds;
-    }
-
-    public void printStats()
-    {
-        output.println("\n");
-        if (tallyOpRateCount > 0) {
-            output.println("Averages from the middle 80% of values:");
-            output.println(String.format("interval_op_rate          : %d", 
-                                         (tallyOpRateSum / tallyOpRateCount)));
-            output.println(String.format("interval_key_rate         : %d", 
-                                         (tallyKeyRateSum / tallyKeyRateCount)));
-            output.println(String.format("latency median            : %.1f", 
-                                         (tallyLatencySum / tallyLatencyCount)));
-            output.println(String.format("latency 95th percentile   : %.1f",
-                                         (tally95thLatencySum / tally95thLatencyCount)));
-            output.println(String.format("latency 99.9th percentile : %.1f", 
-                                         (tally999thLatencySum / tally999thLatencyCount)));
-        }
-        output.println("Total operation time      : " + DurationFormatUtils.formatDuration(
-            durationInSeconds*1000, "HH:mm:ss", true));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
new file mode 100644
index 0000000..4c22005
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
@@ -0,0 +1,18 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public abstract class DataGen
+{
+
+    public abstract void generate(ByteBuffer fill, long offset);
+    public abstract boolean isDeterministic();
+
+    public void generate(List<ByteBuffer> fills, long offset)
+    {
+        for (ByteBuffer fill : fills)
+            generate(fill, offset++);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
new file mode 100644
index 0000000..3906f93
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
@@ -0,0 +1,24 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class DataGenBytesRandom extends DataGen
+{
+
+    private final Random rnd = new Random();
+
+    @Override
+    public void generate(ByteBuffer fill, long offset)
+    {
+        fill.clear();
+        rnd.nextBytes(fill.array());
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
new file mode 100644
index 0000000..c5738cc
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.Serializable;
+
+public interface DataGenFactory extends Serializable
+{
+    DataGen get();
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
new file mode 100644
index 0000000..50d49dd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
@@ -0,0 +1,39 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+
+public abstract class DataGenHex extends DataGen
+{
+
+    abstract long next(long operationIndex);
+
+    @Override
+    public final void generate(ByteBuffer fill, long operationIndex)
+    {
+        fill.clear();
+        fillKeyStringBytes(next(operationIndex), fill.array());
+    }
+
+    public static void fillKeyStringBytes(long key, byte[] fill)
+    {
+        int ub = fill.length - 1;
+        int offset = 0;
+        while (key != 0)
+        {
+            int digit = ((int) key) & 15;
+            key >>>= 4;
+            fill[ub - offset++] = digit(digit);
+        }
+        while (offset < fill.length)
+            fill[ub - offset++] = '0';
+    }
+
+    // needs to be UTF-8, but for these chars there is no difference
+    private static byte digit(int num)
+    {
+        if (num < 10)
+            return (byte)('0' + num);
+        return (byte)('A' + (num - 10));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
new file mode 100644
index 0000000..3391fce
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
@@ -0,0 +1,45 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.UniformRealDistribution;
+
+public class DataGenHexFromDistribution extends DataGenHex
+{
+
+    final Distribution distribution;
+
+    public DataGenHexFromDistribution(Distribution distribution)
+    {
+        this.distribution = distribution;
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return false;
+    }
+
+    @Override
+    long next(long operationIndex)
+    {
+        return distribution.next();
+    }
+
+    public static DataGenHex buildGaussian(long minKey, long maxKey, double stdevsToLimit)
+    {
+        double midRange = (maxKey + minKey) / 2d;
+        double halfRange = (maxKey - minKey) / 2d;
+        return new DataGenHexFromDistribution(new DistributionBoundApache(new NormalDistribution(midRange, halfRange / stdevsToLimit), minKey, maxKey));
+    }
+
+    public static DataGenHex buildGaussian(long minKey, long maxKey, double mean, double stdev)
+    {
+        return new DataGenHexFromDistribution(new DistributionBoundApache(new NormalDistribution(mean, stdev), minKey, maxKey));
+    }
+
+    public static DataGenHex buildUniform(long minKey, long maxKey)
+    {
+        return new DataGenHexFromDistribution(new DistributionBoundApache(new UniformRealDistribution(minKey, maxKey), minKey, maxKey));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
new file mode 100644
index 0000000..5d499d5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
@@ -0,0 +1,27 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DataGenHexFromOpIndex extends DataGenHex
+{
+
+    final long minKey;
+    final long maxKey;
+
+    public DataGenHexFromOpIndex(long minKey, long maxKey)
+    {
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return true;
+    }
+
+    @Override
+    long next(long operationIndex)
+    {
+        long range = maxKey + 1 - minKey;
+        return Math.abs((operationIndex % range) + minKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
new file mode 100644
index 0000000..68c8034
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DataGenStringDictionary extends DataGen
+{
+
+    private final byte space = ' ';
+    private final EnumeratedDistribution<byte[]> words;
+
+    public DataGenStringDictionary(EnumeratedDistribution<byte[]> wordDistribution)
+    {
+        words = wordDistribution;
+    }
+
+    @Override
+    public void generate(ByteBuffer fill, long index)
+    {
+        fill(fill, 0);
+    }
+
+    @Override
+    public void generate(List<ByteBuffer> fills, long index)
+    {
+        for (int i = 0 ; i < fills.size() ; i++)
+            fill(fills.get(0), i);
+    }
+
+    private void fill(ByteBuffer fill, int column)
+    {
+        fill.clear();
+        byte[] trg = fill.array();
+        int i = 0;
+        while (i < trg.length)
+        {
+            if (i > 0)
+                trg[i++] = space;
+            byte[] src = words.sample();
+            System.arraycopy(src, 0, trg, i, Math.min(src.length, trg.length - i));
+            i += src.length;
+        }
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return true;
+    }
+
+    public static DataGenFactory getFactory(File file) throws IOException
+    {
+        final List<Pair<byte[], Double>> words = new ArrayList<>();
+        final BufferedReader reader = new BufferedReader(new FileReader(file));
+        String line;
+        while ( null != (line = reader.readLine()) )
+        {
+            String[] pair = line.split(" +");
+            if (pair.length != 2)
+                throw new IllegalArgumentException("Invalid record in dictionary: \"" + line + "\"");
+            words.add(new Pair<>(pair[1].getBytes(UTF_8), Double.parseDouble(pair[0])));
+        }
+        final EnumeratedDistribution<byte[]> dist = new EnumeratedDistribution<byte[]>(words);
+        return new DataGenFactory()
+        {
+            @Override
+            public DataGen get()
+            {
+                return new DataGenStringDictionary(dist);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
new file mode 100644
index 0000000..47091f7
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
@@ -0,0 +1,69 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DataGenStringRepeats extends DataGen
+{
+
+    private static final ConcurrentHashMap<Integer, ConcurrentHashMap<Long, byte[]>> CACHE_LOOKUP = new ConcurrentHashMap<>();
+
+    private final ConcurrentHashMap<Long, byte[]> cache;
+    private final int repeatFrequency;
+    public DataGenStringRepeats(int repeatFrequency)
+    {
+        if (!CACHE_LOOKUP.containsKey(repeatFrequency))
+            CACHE_LOOKUP.putIfAbsent(repeatFrequency, new ConcurrentHashMap<Long, byte[]>());
+        cache = CACHE_LOOKUP.get(repeatFrequency);
+        this.repeatFrequency = repeatFrequency;
+    }
+
+    @Override
+    public void generate(ByteBuffer fill, long index)
+    {
+        fill(fill, index, 0);
+    }
+
+    @Override
+    public void generate(List<ByteBuffer> fills, long index)
+    {
+        for (int i = 0 ; i < fills.size() ; i++)
+        {
+            fill(fills.get(i), index, i);
+        }
+    }
+
+    private void fill(ByteBuffer fill, long index, int column)
+    {
+        fill.clear();
+        byte[] trg = fill.array();
+        byte[] src = getData(index, column);
+        for (int j = 0 ; j < trg.length ; j += src.length)
+            System.arraycopy(src, 0, trg, j, Math.min(src.length, trg.length - j));
+    }
+
+    private byte[] getData(long index, int column)
+    {
+        final long key = (column * repeatFrequency) + (index % repeatFrequency);
+        byte[] r = cache.get(key);
+        if (r != null)
+            return r;
+        MessageDigest md = FBUtilities.threadLocalMD5Digest();
+        r = md.digest(Long.toString(key).getBytes(UTF_8));
+        cache.putIfAbsent(key, r);
+        return r;
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
new file mode 100644
index 0000000..5236eab
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
@@ -0,0 +1,19 @@
+package org.apache.cassandra.stress.generatedata;
+
+public abstract class Distribution
+{
+
+    public abstract long next();
+    public abstract long inverseCumProb(double cumProb);
+
+    public long maxValue()
+    {
+        return inverseCumProb(1d);
+    }
+
+    public long minValue()
+    {
+        return inverseCumProb(0d);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
new file mode 100644
index 0000000..9f59dbd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
@@ -0,0 +1,42 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionBoundApache extends Distribution
+{
+
+    final AbstractRealDistribution delegate;
+    final long min, max;
+
+    public DistributionBoundApache(AbstractRealDistribution delegate, long min, long max)
+    {
+        this.delegate = delegate;
+        this.min = min;
+        this.max = max;
+    }
+
+    @Override
+    public long next()
+    {
+        return bound(min, max, delegate.sample());
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        return bound(min, max, delegate.inverseCumulativeProbability(cumProb));
+    }
+
+    private static long bound(long min, long max, double val)
+    {
+        long r = (long) val;
+        if ((r >= min) & (r <= max))
+            return r;
+        if (r < min)
+            return min;
+        if (r > max)
+            return max;
+        throw new IllegalStateException();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
new file mode 100644
index 0000000..ac2b7ba
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
@@ -0,0 +1,10 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.Serializable;
+
+public interface DistributionFactory extends Serializable
+{
+
+    Distribution get();
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
new file mode 100644
index 0000000..6873b1c
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DistributionFixed extends Distribution
+{
+
+    final long key;
+
+    public DistributionFixed(long key)
+    {
+        this.key = key;
+    }
+
+    @Override
+    public long next()
+    {
+        return key;
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        return key;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
new file mode 100644
index 0000000..c7a5aca
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionOffsetApache extends Distribution
+{
+
+    final AbstractRealDistribution delegate;
+    final long min, delta;
+
+    public DistributionOffsetApache(AbstractRealDistribution delegate, long min, long max)
+    {
+        this.delegate = delegate;
+        this.min = min;
+        this.delta = max - min;
+    }
+
+    @Override
+    public long next()
+    {
+        return offset(min, delta, delegate.sample());
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        return offset(min, delta, delegate.inverseCumulativeProbability(cumProb));
+    }
+
+    private long offset(long min, long delta, double val)
+    {
+        long r = (long) val;
+        if (r < 0)
+            r = 0;
+        if (r > delta)
+            r = delta;
+        return min + r;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
new file mode 100644
index 0000000..a1a51bb
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
@@ -0,0 +1,47 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DistributionSeqBatch extends DataGenHex
+{
+
+    final Distribution delegate;
+    final int batchSize;
+    final long maxKey;
+
+    private int batchIndex;
+    private long batchKey;
+
+    // object must be published safely if passed between threadCount, due to batchIndex not being volatile. various
+    // hacks possible, but not ideal. don't want to use volatile as object intended for single threaded use.
+    public DistributionSeqBatch(int batchSize, long maxKey, Distribution delegate)
+    {
+        this.batchIndex = batchSize;
+        this.batchSize = batchSize;
+        this.maxKey = maxKey;
+        this.delegate = delegate;
+    }
+
+    @Override
+    long next(long operationIndex)
+    {
+        if (batchIndex >= batchSize)
+        {
+            batchKey = delegate.next();
+            batchIndex = 0;
+        }
+        long r = batchKey + batchIndex++;
+        if (r > maxKey)
+        {
+            batchKey = delegate.next();
+            batchIndex = 1;
+            r = batchKey;
+        }
+        return r;
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
new file mode 100644
index 0000000..cdd6d39
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
@@ -0,0 +1,33 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KeyGen
+{
+
+    final DataGen dataGen;
+    final int keySize;
+    final List<ByteBuffer> keyBuffers = new ArrayList<>();
+
+    public KeyGen(DataGen dataGen, int keySize)
+    {
+        this.dataGen = dataGen;
+        this.keySize = keySize;
+    }
+
+    public List<ByteBuffer> getKeys(int n, long index)
+    {
+        while (keyBuffers.size() < n)
+            keyBuffers.add(ByteBuffer.wrap(new byte[keySize]));
+        dataGen.generate(keyBuffers, index);
+        return keyBuffers;
+    }
+
+    public boolean isDeterministic()
+    {
+        return dataGen.isDeterministic();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
new file mode 100644
index 0000000..869fbc7
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Generates a row of data, by constructing one byte buffers per column according to some algorithm
+ * and delegating the work of populating the values of those byte buffers to the provided data generator
+ */
+public abstract class RowGen
+{
+
+    final DataGen dataGen;
+    protected RowGen(DataGen dataGenerator)
+    {
+        this.dataGen = dataGenerator;
+    }
+
+    public List<ByteBuffer> generate(long operationIndex)
+    {
+        List<ByteBuffer> fill = getColumns(operationIndex);
+        dataGen.generate(fill, operationIndex);
+        return fill;
+    }
+
+    // these byte[] may be re-used
+    abstract List<ByteBuffer> getColumns(long operationIndex);
+
+    abstract public boolean isDeterministic();
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
new file mode 100644
index 0000000..b68ab3c
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RowGenDistributedSize extends RowGen
+{
+
+    // TODO - make configurable
+    static final int MAX_SINGLE_CACHE_SIZE = 16 * 1024;
+
+    final Distribution countDistribution;
+    final Distribution sizeDistribution;
+
+    final TreeMap<Integer, ByteBuffer> cache = new TreeMap<>();
+
+    // array re-used for returning columns
+    final ByteBuffer[] ret;
+    final int[] sizes;
+
+    public RowGenDistributedSize(DataGen dataGenerator, Distribution countDistribution, Distribution sizeDistribution)
+    {
+        super(dataGenerator);
+        this.countDistribution = countDistribution;
+        this.sizeDistribution = sizeDistribution;
+        ret = new ByteBuffer[(int) countDistribution.maxValue()];
+        sizes = new int[ret.length];
+    }
+
+    ByteBuffer getBuffer(int size)
+    {
+        if (size >= MAX_SINGLE_CACHE_SIZE)
+            return ByteBuffer.allocate(size);
+        Map.Entry<Integer, ByteBuffer> found = cache.ceilingEntry(size);
+        if (found == null)
+        {
+            // remove the next entry down, and replace it with a cache of this size
+            Integer del = cache.lowerKey(size);
+            if (del != null)
+                cache.remove(del);
+            return ByteBuffer.allocate(size);
+        }
+        ByteBuffer r = found.getValue();
+        cache.remove(found.getKey());
+        return r;
+    }
+
+    @Override
+    List<ByteBuffer> getColumns(long operationIndex)
+    {
+        int i = 0;
+        int count = (int) countDistribution.next();
+        while (i < count)
+        {
+            int columnSize = (int) sizeDistribution.next();
+            sizes[i] = columnSize;
+            ret[i] = getBuffer(columnSize);
+            i++;
+        }
+        while (i < ret.length && ret[i] != null)
+            ret[i] = null;
+        i = 0;
+        while (i < count)
+        {
+            ByteBuffer b = ret[i];
+            cache.put(b.capacity(), b);
+            b.position(b.capacity() - sizes[i]);
+            ret[i] = b.slice();
+            b.position(0);
+            i++;
+        }
+        return Arrays.asList(ret).subList(0, count);
+    }
+
+    @Override
+    public boolean isDeterministic()
+    {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
deleted file mode 100644
index 54737a4..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.ThriftConversion;
-
-public abstract class CQLOperation extends Operation
-{
-    public CQLOperation(Session client, int idx)
-    {
-        super(client, idx);
-    }
-
-    protected abstract void run(CQLQueryExecutor executor) throws IOException;
-
-    protected abstract boolean validateThriftResult(CqlResult result);
-
-    protected abstract boolean validateNativeResult(ResultMessage result);
-
-    public void run(final CassandraClient client) throws IOException
-    {
-        run(new CQLQueryExecutor()
-        {
-            public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
-            {
-                CqlResult result = null;
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel());
-                    else
-                        result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
-                }
-                else
-                {
-                    String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
-                    else
-                        result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
-                }
-                return validateThriftResult(result);
-            }
-        });
-    }
-
-    public void run(final SimpleClient client) throws IOException
-    {
-        run(new CQLQueryExecutor()
-        {
-            public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
-            {
-                ResultMessage result = null;
-                if (session.usePreparedStatements())
-                {
-                    byte[] stmntId = getPreparedStatement(client, cqlQuery);
-                    result = client.executePrepared(stmntId, queryParamsAsByteBuffer(queryParams), ThriftConversion.fromThrift(session.getConsistencyLevel()));
-                }
-                else
-                {
-                    String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
-                    result = client.execute(formattedQuery, ThriftConversion.fromThrift(session.getConsistencyLevel()));
-                }
-                return validateNativeResult(result);
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
deleted file mode 100644
index ab6ae9d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CounterAdder extends Operation
-{
-    public CounterAdder(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        List<CounterColumn> columns = new ArrayList<CounterColumn>();
-        List<CounterSuperColumn> superColumns = new ArrayList<CounterSuperColumn>();
-
-        // format used for keys
-        String format = "%0" + session.getTotalKeysLength() + "d";
-
-        for (int i = 0; i < session.getColumnsPerKey(); i++)
-        {
-            String columnName = ("C" + Integer.toString(i));
-
-            columns.add(new CounterColumn(ByteBufferUtil.bytes(columnName), 1L));
-        }
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            // supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
-            for (int i = 0; i < session.getSuperColumns(); i++)
-            {
-                String superColumnName = "S" + Integer.toString(i);
-                superColumns.add(new CounterSuperColumn(ByteBuffer.wrap(superColumnName.getBytes()), columns));
-            }
-        }
-
-        String rawKey = String.format(format, index);
-        Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
-        record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
-                                                                                ? getSuperColumnsMutationMap(superColumns)
-                                                                                : getColumnsMutationMap(columns));
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
-
-            try
-            {
-                client.batch_mutate(record, session.getConsistencyLevel());
-                success = true;
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
-
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error incrementing key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                rawKey,
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
-    }
-
-    private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<CounterSuperColumn> superColumns)
-    {
-        List<Mutation> mutations = new ArrayList<Mutation>();
-        Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
-
-        for (CounterSuperColumn s : superColumns)
-        {
-            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(s);
-            mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
-        }
-
-        mutationMap.put("SuperCounter1", mutations);
-
-        return mutationMap;
-    }
-
-    private Map<String, List<Mutation>> getColumnsMutationMap(List<CounterColumn> columns)
-    {
-        List<Mutation> mutations = new ArrayList<Mutation>();
-        Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
-
-        for (CounterColumn c : columns)
-        {
-            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
-            mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
-        }
-
-        mutationMap.put("Counter1", mutations);
-
-        return mutationMap;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
deleted file mode 100644
index 56ef243..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class CounterGetter extends Operation
-{
-    public CounterGetter(Session client, int index)
-    {
-        super(client, index);
-    }
-
-    public void run(CassandraClient client) throws IOException
-    {
-        SliceRange sliceRange = new SliceRange();
-
-        // start/finish
-        sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
-
-        // reversed/count
-        sliceRange.setReversed(false).setCount(session.getColumnsPerKey());
-
-        // initialize SlicePredicate with existing SliceRange
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange);
-
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-        {
-            runSuperCounterGetter(predicate, client);
-        }
-        else
-        {
-            runCounterGetter(predicate, client);
-        }
-    }
-
-    private void runSuperCounterGetter(SlicePredicate predicate, Cassandra.Client client) throws IOException
-    {
-        byte[] rawKey = generateKey();
-        ByteBuffer key = ByteBuffer.wrap(rawKey);
-
-        for (int j = 0; j < session.getSuperColumns(); j++)
-        {
-            String superColumn = 'S' + Integer.toString(j);
-            ColumnParent parent = new ColumnParent("SuperCounter1").setSuper_column(superColumn.getBytes());
-
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    List<ColumnOrSuperColumn> counters;
-                    counters = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
-                    success = (counters.size() != 0);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    new String(rawKey),
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            session.operations.getAndIncrement();
-            session.keys.getAndIncrement();
-            context.stop();
-        }
-    }
-
-    private void runCounterGetter(SlicePredicate predicate, Cassandra.Client client) throws IOException
-    {
-        ColumnParent parent = new ColumnParent("Counter1");
-
-        byte[] key = generateKey();
-        ByteBuffer keyBuffer = ByteBuffer.wrap(key);
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
-
-            try
-            {
-                List<ColumnOrSuperColumn> counters;
-                counters = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
-                success = (counters.size() != 0);
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
-
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                new String(key),
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index 31e8371..8e1f137 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -21,102 +21,50 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlCounterAdder extends CQLOperation
+public class CqlCounterAdder extends CqlOperation<Integer>
 {
-    private static String cqlQuery = null;
-
-    public CqlCounterAdder(Session client, int idx)
+    public CqlCounterAdder(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected String buildQuery()
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
-
-        if (cqlQuery == null)
-        {
-            String counterCF = session.cqlVersion.startsWith("2") ? "Counter1" : "Counter3";
-
-            StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
-
-            query.append(" SET ");
+        String counterCF = state.isCql2() ? "Counter1" : "Counter3";
 
-            for (int i = 0; i < session.getColumnsPerKey(); i++)
-            {
-                if (i > 0)
-                    query.append(",");
+        StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
 
-                query.append('C').append(i).append("=C").append(i).append("+1");
-            }
-            query.append(" WHERE KEY=?");
-            cqlQuery = query.toString();
-        }
-
-        String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
-        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
 
-        TimerContext context = session.latency.time();
+        query.append(" SET ");
 
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
+        // TODO : increment distribution subset of columns
+        for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
         {
-            if (success)
-                break;
+            if (i > 0)
+                query.append(",");
 
-            try
-            {
-                success = executor.execute(cqlQuery, queryParams);
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
+            query.append('C').append(i).append("=C").append(i).append("+1");
         }
-
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error incrementing key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                key,
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
+        query.append(" WHERE KEY=?");
+        return query.toString();
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        return true;
+        return Collections.singletonList(ByteBuffer.wrap(key));
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
+    @Override
+    protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        return true;
+        return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index a4d037a..0a0b05b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -21,100 +21,48 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlResultType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlCounterGetter extends CQLOperation
+public class CqlCounterGetter extends CqlOperation<Integer>
 {
-    private static String cqlQuery = null;
 
-    public CqlCounterGetter(Session client, int idx)
+    public CqlCounterGetter(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
-
-        if (cqlQuery == null)
-        {
-            StringBuilder query = new StringBuilder("SELECT ");
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
-            else
-                query.append("*");
-
-            String counterCF = session.cqlVersion.startsWith("2") ? "Counter1" : "Counter3";
-
-            query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
-            cqlQuery = query.append(" WHERE KEY=?").toString();
-        }
-
-        byte[] key = generateKey();
-        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
+        return Collections.singletonList(ByteBuffer.wrap(key));
+    }
 
-        TimerContext context = session.latency.time();
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("SELECT ");
 
-        boolean success = false;
-        String exceptionMessage = null;
+        if (state.isCql2())
+            query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
+        else
+            query.append("*");
 
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
+        String counterCF = state.isCql2() ? "Counter1" : "Counter3";
 
-            try
-            {
-                success = executor.execute(cqlQuery, queryParams);
-            }
-            catch (Exception e)
-            {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
-            }
-        }
+        query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
 
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
-                                index,
-                                session.getRetryTimes(),
-                                new String(key),
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-        }
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
 
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
+        return query.append(" WHERE KEY=?").toString();
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        return result.rows.get(0).columns.size() != 0;
+        return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
-    {
-        return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index bf416cc..748bf30 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -1,179 +1,123 @@
 package org.apache.cassandra.stress.operations;
 /*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
 
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlIndexedRangeSlicer extends CQLOperation
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
 {
-    private static List<ByteBuffer> values = null;
-    private static String cqlQuery = null;
 
-    private int lastQueryResultSize;
-    private int lastMaxKey;
+    volatile boolean acceptNoResults = false;
 
-    public CqlIndexedRangeSlicer(Session client, int idx)
+    public CqlIndexedRangeSlicer(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
-
-        if (values == null)
-            values = generateValues();
-
-        if (cqlQuery == null)
-        {
-            StringBuilder query = new StringBuilder("SELECT ");
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(session.getColumnsPerKey()).append(" ''..''");
-            else
-                query.append("*");
-
-            query.append(" FROM Standard1");
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
+        throw new UnsupportedOperationException();
+    }
 
-            query.append(" WHERE C1=").append(getUnQuotedCqlBlob(values.get(1).array(), session.cqlVersion.startsWith("3")))
-                 .append(" AND KEY > ? LIMIT ").append(session.getKeysPerCall());
+    @Override
+    protected String buildQuery()
+    {
+        StringBuilder query = new StringBuilder("SELECT ");
 
-            cqlQuery = query.toString();
-        }
+        if (state.isCql2())
+            query.append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
+        else
+            query.append("*");
 
-        String format = "%0" + session.getTotalKeysLength() + "d";
-        String startOffset = String.format(format, 0);
+        query.append(" FROM Standard1");
 
-        int expectedPerValue = session.getNumKeys() / values.size(), received = 0;
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
 
-        while (received < expectedPerValue)
-        {
-            TimerContext context = session.latency.time();
-
-            boolean success = false;
-            String exceptionMessage = null;
-            String formattedQuery = null;
-            List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset, session.cqlVersion.startsWith("3")));
-
-            for (int t = 0; t < session.getRetryTimes(); t++)
-            {
-                if (success)
-                    break;
-
-                try
-                {
-                    success = executor.execute(cqlQuery, queryParms);
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
-            }
-
-            if (!success)
-            {
-                error(String.format("Operation [%d] retried %d times - error executing indexed range query with offset %s %s%n",
-                                    index,
-                                    session.getRetryTimes(),
-                                    startOffset,
-                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
-            }
-
-            received += lastQueryResultSize;
-
-            // convert max key found back to an integer, and increment it
-            startOffset = String.format(format, (1 + lastMaxKey));
-
-            session.operations.getAndIncrement();
-            session.keys.getAndAdd(lastQueryResultSize);
-            context.stop();
-        }
+        final String columnName = getColumnName(1);
+        query.append(" WHERE ").append(columnName).append("=?")
+                .append(" AND KEY > ? LIMIT ").append(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+        return query.toString();
     }
 
-    /**
-     * Get maximum key from CqlRow list
-     * @param rows list of the CqlRow objects
-     * @return maximum key value of the list
-     */
-    private int getMaxKey(List<CqlRow> rows)
+    @Override
+    protected void run(CqlOperation.ClientWrapper client) throws IOException
     {
-        int maxKey = ByteBufferUtil.toInt(rows.get(0).key);
-
-        for (CqlRow row : rows)
+        acceptNoResults = false;
+        final List<ByteBuffer> columns = generateColumnValues();
+        final ByteBuffer value = columns.get(1); // only C1 column is indexed
+        byte[] minKey = new byte[0];
+        int rowCount;
+        do
         {
-            int currentKey = ByteBufferUtil.toInt(row.key);
-            if (currentKey > maxKey)
-                maxKey = currentKey;
-        }
-
-        return maxKey;
+            List<ByteBuffer> params = Arrays.asList(value, ByteBuffer.wrap(minKey));
+            CqlRunOp<byte[][]> op = run(client, params, value, new String(value.array()));
+            byte[][] keys = op.result;
+            rowCount = keys.length;
+            minKey = getNextMinKey(minKey, keys);
+            acceptNoResults = true;
+        } while (rowCount > 0);
     }
 
-    private int getMaxKey(ResultSet rs)
+    private final class IndexedRangeSliceRunOp extends CqlRunOpFetchKeys
     {
-        int maxKey = ByteBufferUtil.toInt(rs.rows.get(0).get(0));
 
-        for (List<ByteBuffer> row : rs.rows)
+        protected IndexedRangeSliceRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
         {
-            int currentKey = ByteBufferUtil.toInt(row.get(0));
-            if (currentKey > maxKey)
-                maxKey = currentKey;
+            super(client, query, queryId, params, keyid, key);
         }
 
-        return maxKey;
+        @Override
+        public boolean validate(byte[][] result)
+        {
+            return acceptNoResults || result.length > 0;
+        }
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected CqlRunOp<byte[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        lastQueryResultSize = result.rows.size();
-        lastMaxKey = getMaxKey(result.rows);
-        return lastQueryResultSize != 0;
+        return new IndexedRangeSliceRunOp(client, query, queryId, params, keyid, key);
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
+    private static byte[] getNextMinKey(byte[] cur, byte[][] keys)
     {
-        assert result instanceof ResultMessage.Rows;
-        lastQueryResultSize = ((ResultMessage.Rows)result).result.size();
-        lastMaxKey = getMaxKey(((ResultMessage.Rows)result).result);
-        return lastQueryResultSize != 0;
+        // find max
+        for (byte[] key : keys)
+            if (FBUtilities.compareUnsigned(cur, key) < 0)
+                cur = key;
+
+        // increment
+        for (int i = 0 ; i < cur.length ; i++)
+            if (++cur[i] != 0)
+                break;
+        return cur;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index d593e57..6b1577c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -21,126 +21,66 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.utils.UUIDGen;
 
-public class CqlInserter extends CQLOperation
+public class CqlInserter extends CqlOperation<Integer>
 {
-    private static List<ByteBuffer> values;
-    private static String cqlQuery = null;
 
-    public CqlInserter(Session client, int idx)
+    public CqlInserter(State state, long idx)
     {
-        super(client, idx);
+        super(state, idx);
     }
 
-    protected void run(CQLQueryExecutor executor) throws IOException
+    @Override
+    protected String buildQuery()
     {
-        if (session.getColumnFamilyType() == ColumnFamilyType.Super)
-            throw new RuntimeException("Super columns are not implemented for CQL");
+        StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
 
-        if (values == null)
-            values = generateValues();
+        if (state.isCql2())
+            query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
 
-        // Construct a query string once.
-        if (cqlQuery == null)
-        {
-            StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired("Standard1"));
-
-            if (session.cqlVersion.startsWith("2"))
-                query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
-            query.append(" SET ");
-
-            for (int i = 0; i < session.getColumnsPerKey(); i++)
-            {
-                if (i > 0)
-                    query.append(',');
-
-                if (session.timeUUIDComparator)
-                {
-                    if (session.cqlVersion.startsWith("3"))
-                        throw new UnsupportedOperationException("Cannot use UUIDs in column names with CQL3");
-
-                    query.append(wrapInQuotesIfRequired(UUIDGen.getTimeUUID().toString()))
-                         .append(" = ?");
-                }
-                else
-                {
-                    query.append(wrapInQuotesIfRequired("C" + i)).append(" = ?");
-                }
-            }
-
-            query.append(" WHERE KEY=?");
-            cqlQuery = query.toString();
-        }
+        query.append(" SET ");
 
-        List<String> queryParms = new ArrayList<String>();
-        for (int i = 0; i < session.getColumnsPerKey(); i++)
+        for (int i = 0 ; i < state.settings.columns.maxColumnsPerKey; i++)
         {
-            // Cell value
-            queryParms.add(getUnQuotedCqlBlob(values.get(i % values.size()).array(), session.cqlVersion.startsWith("3")));
-        }
-
-        String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
-        queryParms.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
-        TimerContext context = session.latency.time();
-
-        boolean success = false;
-        String exceptionMessage = null;
-
-        for (int t = 0; t < session.getRetryTimes(); t++)
-        {
-            if (success)
-                break;
+            if (i > 0)
+                query.append(',');
 
-            try
+            if (state.settings.columns.useTimeUUIDComparator)
             {
-                success = executor.execute(cqlQuery, queryParms);
+                if (state.isCql3())
+                    throw new UnsupportedOperationException("Cannot use UUIDs in column names with CQL3");
+
+                query.append(wrapInQuotesIfRequired(UUIDGen.getTimeUUID().toString()))
+                        .append(" = ?");
             }
-            catch (Exception e)
+            else
             {
-                exceptionMessage = getExceptionMessage(e);
-                success = false;
+                query.append(wrapInQuotesIfRequired("C" + i)).append(" = ?");
             }
         }
 
-        if (!success)
-        {
-            error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n with query %s",
-                                index,
-                                session.getRetryTimes(),
-                                key,
-                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")",
-                                cqlQuery));
-        }
-
-        session.operations.getAndIncrement();
-        session.keys.getAndIncrement();
-        context.stop();
+        query.append(" WHERE KEY=?");
+        return query.toString();
     }
 
-    protected boolean validateThriftResult(CqlResult result)
+    @Override
+    protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
-        return true;
+        final ArrayList<ByteBuffer> queryParams = new ArrayList<>();
+        final List<ByteBuffer> values = generateColumnValues();
+        queryParams.addAll(values);
+        queryParams.add(ByteBuffer.wrap(key));
+        return queryParams;
     }
 
-    protected boolean validateNativeResult(ResultMessage result)
+    @Override
+    protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        return true;
+        return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
index ec645d4..80a7118 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -23,25 +23,20 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
 
 public class CqlMultiGetter extends Operation
 {
-    public CqlMultiGetter(Session client, int idx)
-    {
-        super(client, idx);
-    }
-
-    public void run(CassandraClient client) throws IOException
+    public CqlMultiGetter(State state, long idx)
     {
+        super(state, idx);
         throw new RuntimeException("Multiget is not implemented for CQL");
     }
 
-    public void run(SimpleClient client) throws IOException
+    @Override
+    public void run(ThriftClient client) throws IOException
     {
-        throw new RuntimeException("Multiget is not implemented for CQL");
     }
+
 }


Mime
View raw message