cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/2] cassandra git commit: cassandra-stress prints per operation statistics, plus misc other stress updates
Date Tue, 10 Mar 2015 14:26:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk bf9c50313 -> a0586f692


cassandra-stress prints per operation statistics, plus misc other stress updates

patch by anthony cozzie, reviewed by benedict for CASSANDRA-8769


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bbfb557
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bbfb557
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bbfb557

Branch: refs/heads/trunk
Commit: 6bbfb5574e0487d490eb5872c70853c4c56d2940
Parents: ef6fa37
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Tue Mar 10 14:19:23 2015 +0000
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Mar 10 14:19:23 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/stress/Operation.java  |   2 +-
 .../apache/cassandra/stress/StressAction.java   |  46 +++++----
 .../apache/cassandra/stress/StressMetrics.java  |  83 ++++++++++-----
 .../apache/cassandra/stress/StressProfile.java  |  15 ++-
 .../stress/operations/FixedOpDistribution.java  |  10 ++
 .../stress/operations/OpDistribution.java       |   2 +
 .../operations/OpDistributionFactory.java       |   6 +-
 .../operations/SampledOpDistribution.java       |  17 ++++
 .../SampledOpDistributionFactory.java           |  10 +-
 .../cassandra/stress/settings/Command.java      |  27 +++--
 .../cassandra/stress/settings/Option.java       |   1 +
 .../stress/settings/OptionAnyProbabilities.java |   5 +
 .../stress/settings/OptionDistribution.java     |  17 ++--
 .../cassandra/stress/settings/OptionMulti.java  |  15 ++-
 .../settings/OptionRatioDistribution.java       |   5 +
 .../stress/settings/OptionReplication.java      |   2 +-
 .../stress/settings/SettingsCommand.java        |  48 ++++++---
 .../settings/SettingsCommandPreDefined.java     |  12 ++-
 .../stress/settings/SettingsCommandUser.java    |   5 +
 .../org/apache/cassandra/stress/util/Timer.java |  10 +-
 .../apache/cassandra/stress/util/Timing.java    |  75 +++++++++-----
 .../cassandra/stress/util/TimingInterval.java   | 102 +++++++++++++------
 23 files changed, 355 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2ac1aa..af5206b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)
  * Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523)
  * Use long for key count in cfstats (CASSANDRA-8913)
  * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 05045f8..f4ac5ee 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -176,7 +176,7 @@ public abstract class Operation
             }
         }
 
-        timer.stop(run.partitionCount(), run.rowCount());
+        timer.stop(run.partitionCount(), run.rowCount(), !success);
 
         if (!success)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 1433742..f906a55 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -30,10 +30,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.stress.operations.OpDistribution;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.settings.SettingsCommand;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.TimingInterval;
 import org.apache.cassandra.transport.SimpleClient;
 
 public class StressAction implements Runnable
@@ -53,13 +54,14 @@ public class StressAction implements Runnable
         // creating keyspace and column families
         settings.maybeCreateKeyspaces();
 
-        // TODO: warmup should operate configurably over op/pk/row, and be of configurable length
-        if (!settings.command.noWarmup)
-            warmup(settings.command.getFactory(settings));
-
         output.println("Sleeping 2s...");
         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
+        if (!settings.command.noWarmup)
+            warmup(settings.command.getFactory(settings));
+        if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE)
+            settings.command.truncateTables(settings);
+
         // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
         RateLimiter rateLimiter = null;
         if (settings.rate.opRateTargetPerSecond > 0)
@@ -86,12 +88,19 @@ public class StressAction implements Runnable
         // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
         PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
         int iterations = 50000 * settings.node.nodes.size();
+        int threads = 20;
+
+        if (settings.rate.maxThreads > 0)
+            threads = Math.min(threads, settings.rate.maxThreads);
+        if (settings.rate.threadCount > 0)
+            threads = Math.min(threads, settings.rate.threadCount);
+
         for (OpDistributionFactory single : operations.each())
         {
             // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
             // so warm up all the nodes we're speaking to only.
             output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
-            run(single, 20, iterations, 0, null, null, warmupOutput);
+            run(single, threads, iterations, 0, null, null, warmupOutput);
         }
     }
 
@@ -109,6 +118,9 @@ public class StressAction implements Runnable
         {
             output.println(String.format("Running with %d threadCount", threadCount));
 
+            if (settings.command.truncate == SettingsCommand.TruncateWhen.ALWAYS)
+                settings.command.truncateTables(settings);
+
             StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
                                        settings.command.duration, rateLimiter, settings.command.durationUnits, output);
             if (result == null)
@@ -146,7 +158,7 @@ public class StressAction implements Runnable
         } while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty)));
 
         // summarise all results
-        StressMetrics.summarise(runIds, results, output);
+        StressMetrics.summarise(runIds, results, output, settings.samples.historyCount);
         return true;
     }
 
@@ -187,8 +199,8 @@ public class StressAction implements Runnable
         final Consumer[] consumers = new Consumer[threadCount];
         for (int i = 0; i < threadCount; i++)
         {
-            Timer timer = metrics.getTiming().newTimer(settings.samples.liveCount / threadCount);
-            consumers[i] = new Consumer(operations, done, workManager, timer, metrics, rateLimiter);
+            consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter,
+                                        settings.samples.liveCount / threadCount);
         }
 
         // starting worker threadCount
@@ -240,28 +252,27 @@ public class StressAction implements Runnable
 
         private final OpDistribution operations;
         private final StressMetrics metrics;
-        private final Timer timer;
         private final RateLimiter rateLimiter;
         private volatile boolean success = true;
         private final WorkManager workManager;
         private final CountDownLatch done;
 
-        public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, Timer timer, StressMetrics metrics, RateLimiter rateLimiter)
+        public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics,
+                        RateLimiter rateLimiter, int sampleCount)
         {
             this.done = done;
             this.rateLimiter = rateLimiter;
             this.workManager = workManager;
             this.metrics = metrics;
-            this.timer = timer;
-            this.operations = operations.get(timer);
+            this.operations = operations.get(metrics.getTiming(), sampleCount);
         }
 
         public void run()
         {
-            timer.init();
+            operations.initTimers();
+
             try
             {
-
                 SimpleClient sclient = null;
                 ThriftClient tclient = null;
                 JavaDriverClient jclient = null;
@@ -324,11 +335,8 @@ public class StressAction implements Runnable
             finally
             {
                 done.countDown();
-                timer.close();
+                operations.closeTimers();
             }
-
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index d1cc0d4..46ca488 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -24,18 +24,16 @@ package org.apache.cassandra.stress;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.cassandra.stress.util.*;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.JmxCollector;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.cassandra.stress.util.TimingInterval;
-import org.apache.cassandra.stress.util.Uncertainty;
 
 public class StressMetrics
 {
@@ -51,10 +49,12 @@ public class StressMetrics
     private final Timing timing;
     private final Callable<JmxCollector.GcStats> gcStatsCollector;
     private volatile JmxCollector.GcStats totalGcStats;
+    private final StressSettings settings;
 
     public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
     {
         this.output = output;
+        this.settings = settings;
         Callable<JmxCollector.GcStats> gcStatsCollector;
         totalGcStats = new JmxCollector.GcStats(0);
         try
@@ -93,7 +93,9 @@ public class StressMetrics
                     {
                         try
                         {
-                            long sleep = timing.getHistory().endMillis() + logIntervalMillis - System.currentTimeMillis();
+                            long sleepNanos = timing.getHistory().endNanos() - System.nanoTime();
+                            long sleep = (sleepNanos / 1000000) + logIntervalMillis;
+
                             if (sleep < logIntervalMillis >>> 3)
                                 // if had a major hiccup, sleep full interval
                                 Thread.sleep(logIntervalMillis);
@@ -154,9 +156,19 @@ public class StressMetrics
     {
         Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector);
         totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra));
-        if (result.timing.partitionCount != 0)
-            printRow("", result.timing, timing.getHistory(), result.extra, rowRateUncertainty, output);
-        rowRateUncertainty.update(result.timing.adjustedRowRate());
+        TimingInterval current = result.intervals.combine(settings.samples.reportCount);
+        TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
+        rowRateUncertainty.update(current.adjustedRowRate());
+        if (current.partitionCount != 0)
+        {
+            if (result.intervals.intervals().size() > 1)
+            {
+                for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet())
+                    printRow("", type.getKey(), type.getValue(), timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
+            }
+
+            printRow("", "total", current, history, result.extra, rowRateUncertainty, output);
+        }
         if (timing.done())
             stop = true;
     }
@@ -164,19 +176,19 @@ public class StressMetrics
 
     // PRINT FORMATTING
 
-    public static final String HEADFORMAT = "%-10s,%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%8s,%8s,%8s,%8s";
-    public static final String ROWFORMAT =  "%-10d,%10.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
+    public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
+    public static final String ROWFORMAT =  "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
 
     private static void printHeader(String prefix, PrintStream output)
     {
-        output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "gc: #", "max ms", "sum ms", "sdv ms", "mb"));
+        output.println(prefix + String.format(HEADFORMAT, "type,", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"));
     }
 
-    private static void printRow(String prefix, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
+    private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
     {
         output.println(prefix + String.format(ROWFORMAT,
+                type + ",",
                 total.operationCount,
-                interval.adjustedRowRate(),
                 interval.opRate(),
                 interval.partitionRate(),
                 interval.rowRate(),
@@ -188,6 +200,7 @@ public class StressMetrics
                 interval.maxLatency(),
                 total.runTime() / 1000f,
                 opRateUncertainty.getUncertainty(),
+                interval.errorCount,
                 gcStats.count,
                 gcStats.maxms,
                 gcStats.summs,
@@ -200,16 +213,20 @@ public class StressMetrics
     {
         output.println("\n");
         output.println("Results:");
-        TimingInterval history = timing.getHistory();
-        output.println(String.format("op rate                   : %.0f", history.opRate()));
-        output.println(String.format("partition rate            : %.0f", history.partitionRate()));
-        output.println(String.format("row rate                  : %.0f", history.rowRate()));
-        output.println(String.format("latency mean              : %.1f", history.meanLatency()));
-        output.println(String.format("latency median            : %.1f", history.medianLatency()));
-        output.println(String.format("latency 95th percentile   : %.1f", history.rankLatency(.95f)));
-        output.println(String.format("latency 99th percentile   : %.1f", history.rankLatency(0.99f)));
-        output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f)));
-        output.println(String.format("latency max               : %.1f", history.maxLatency()));
+
+        TimingIntervals opHistory = timing.getHistory();
+        TimingInterval history = opHistory.combine(settings.samples.historyCount);
+        output.println(String.format("op rate                   : %.0f %s", history.opRate(), opHistory.opRates()));
+        output.println(String.format("partition rate            : %.0f %s", history.partitionRate(), opHistory.partitionRates()));
+        output.println(String.format("row rate                  : %.0f %s", history.rowRate(), opHistory.rowRates()));
+        output.println(String.format("latency mean              : %.1f %s", history.meanLatency(), opHistory.meanLatencies()));
+        output.println(String.format("latency median            : %.1f %s", history.medianLatency(), opHistory.medianLatencies()));
+        output.println(String.format("latency 95th percentile   : %.1f %s", history.rankLatency(.95f), opHistory.rankLatencies(0.95f)));
+        output.println(String.format("latency 99th percentile   : %.1f %s", history.rankLatency(0.99f), opHistory.rankLatencies(0.99f)));
+        output.println(String.format("latency 99.9th percentile : %.1f %s", history.rankLatency(0.999f), opHistory.rankLatencies(0.999f)));
+        output.println(String.format("latency max               : %.1f %s", history.maxLatency(), opHistory.maxLatencies()));
+        output.println(String.format("Total partitions          : %d %s",   history.partitionCount, opHistory.partitionCounts()));
+        output.println(String.format("Total errors              : %d %s",   history.errorCount, opHistory.errorCounts()));
         output.println(String.format("total gc count            : %.0f", totalGcStats.count));
         output.println(String.format("total gc mb               : %.0f", totalGcStats.bytes / (1 << 20)));
         output.println(String.format("total gc time (s)         : %.0f", totalGcStats.summs / 1000));
@@ -219,7 +236,7 @@ public class StressMetrics
                 history.runTime(), "HH:mm:ss", true));
     }
 
-    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out, int historySampleCount)
     {
         int idLen = 0;
         for (String id : ids)
@@ -227,13 +244,27 @@ public class StressMetrics
         String formatstr = "%" + idLen + "s, ";
         printHeader(String.format(formatstr, "id"), out);
         for (int i = 0 ; i < ids.size() ; i++)
+        {
+            for (Map.Entry<String, TimingInterval> type : summarise.get(i).timing.getHistory().intervals().entrySet())
+            {
+                printRow(String.format(formatstr, ids.get(i)),
+                         type.getKey(),
+                         type.getValue(),
+                         type.getValue(),
+                         summarise.get(i).totalGcStats,
+                         summarise.get(i).rowRateUncertainty,
+                         out);
+            }
+            TimingInterval hist = summarise.get(i).timing.getHistory().combine(historySampleCount);
             printRow(String.format(formatstr, ids.get(i)),
-                    summarise.get(i).timing.getHistory(),
-                    summarise.get(i).timing.getHistory(),
+                    "total",
+                    hist,
+                    hist,
                     summarise.get(i).totalGcStats,
                     summarise.get(i).rowRateUncertainty,
                     out
             );
+        }
     }
 
     public Timing getTiming()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 687b3ae..6c73214 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -43,10 +43,7 @@ import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.values.*;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
-import org.apache.cassandra.stress.settings.OptionDistribution;
-import org.apache.cassandra.stress.settings.OptionRatioDistribution;
-import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.settings.*;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
@@ -188,6 +185,16 @@ public class StressProfile implements Serializable
         maybeLoadSchemaInfo(settings);
     }
 
+    public void truncateTable(StressSettings settings)
+    {
+        JavaDriverClient client = settings.getJavaDriverClient(false);
+        assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER;
+        String cql = String.format("TRUNCATE %s.%s", keyspaceName, tableName);
+        client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+        System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.",
+                                         keyspaceName, tableName, settings.node.nodes.size()));
+        Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+    }
 
     private void maybeLoadSchemaInfo(StressSettings settings)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 533b630..f2616cf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -36,4 +36,14 @@ public class FixedOpDistribution implements OpDistribution
     {
         return operation;
     }
+
+    public void initTimers()
+    {
+        operation.timer.init();
+    }
+
+    public void closeTimers()
+    {
+        operation.timer.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index 0fc15a6..e09300a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -28,4 +28,6 @@ public interface OpDistribution
 
     Operation next();
 
+    public void initTimers();
+    public void closeTimers();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index afbae7d..7e13fcd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -21,13 +21,11 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.Timing;
 
 public interface OpDistributionFactory
 {
-
-    public OpDistribution get(Timer timer);
+    public OpDistribution get(Timing timing, int sampleCount);
     public String desc();
     Iterable<OpDistributionFactory> each();
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index 432e991..9698421 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -25,6 +25,7 @@ import org.apache.commons.math3.distribution.EnumeratedDistribution;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.commons.math3.util.Pair;
 
 public class SampledOpDistribution implements OpDistribution
 {
@@ -50,4 +51,20 @@ public class SampledOpDistribution implements OpDistribution
         remaining--;
         return cur;
     }
+
+    public void initTimers()
+    {
+        for (Pair<Operation, Double> op : operations.getPmf())
+        {
+            op.getFirst().timer.init();
+        }
+    }
+
+    public void closeTimers()
+    {
+        for (Pair<Operation, Double> op : operations.getPmf())
+        {
+            op.getFirst().timer.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9713e93..10191a6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.stress.util.Timing;
 import org.apache.commons.math3.distribution.EnumeratedDistribution;
 import org.apache.commons.math3.util.Pair;
 
@@ -49,12 +50,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
     protected abstract Operation get(Timer timer, PartitionGenerator generator, T key);
     protected abstract PartitionGenerator newGenerator();
 
-    public OpDistribution get(Timer timer)
+    public OpDistribution get(Timing timing, int sampleCount)
     {
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
-            operations.add(new Pair<>(get(timer, generator, ratio.getKey()), ratio.getValue()));
+            operations.add(new Pair<>(get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey()),
+                                      ratio.getValue()));
         return new SampledOpDistribution(new EnumeratedDistribution<>(operations), clustering.get());
     }
 
@@ -73,9 +75,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         {
             out.add(new OpDistributionFactory()
             {
-                public OpDistribution get(Timer timer)
+                public OpDistribution get(Timing timing, int sampleCount)
                 {
-                    return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timer, newGenerator(), ratio.getKey()));
+                    return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey()));
                 }
 
                 public String desc()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
index 9a93e34..c47c5d2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
@@ -31,38 +31,37 @@ import com.google.common.collect.ImmutableList;
 public enum Command
 {
 
-    READ(false, "standard1", "Super1",
+    READ(false, "standard1",
             "Multiple concurrent reads - the cluster must first be populated by a write test",
             CommandCategory.BASIC
     ),
-    WRITE(true, "standard1", "Super1",
+    WRITE(true, "standard1",
             "insert",
             "Multiple concurrent writes against the cluster",
             CommandCategory.BASIC
     ),
-    MIXED(true, null, null,
+    MIXED(true, null,
             "Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test",
             CommandCategory.MIXED
     ),
-    COUNTER_WRITE(true, "counter1", "SuperCounter1",
+    COUNTER_WRITE(true, "counter1",
             "counter_add",
             "Multiple concurrent updates of counters.",
             CommandCategory.BASIC
     ),
-    COUNTER_READ(false, "counter1", "SuperCounter1",
+    COUNTER_READ(false, "counter1",
             "counter_get",
             "Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.",
             CommandCategory.BASIC
     ),
-    USER(true, null, null,
+    USER(true, null,
           "Interleaving of user provided queries, with configurable ratio and distribution",
           CommandCategory.USER
     ),
 
-    HELP(false, null, null, "-?", "Print help for a command or option", null),
-    PRINT(false, null, null, "Inspect the output of a distribution definition", null),
-    LEGACY(false, null, null, "Legacy support mode", null)
-
+    HELP(false, null, "-?", "Print help for a command or option", null),
+    PRINT(false, null, "Inspect the output of a distribution definition", null),
+    LEGACY(false, null, "Legacy support mode", null)
     ;
 
     private static final Map<String, Command> LOOKUP;
@@ -87,17 +86,15 @@ public enum Command
     public final List<String> names;
     public final String description;
     public final String table;
-    public final String supertable;
 
-    Command(boolean updates, String table, String supertable, String description, CommandCategory category)
+    Command(boolean updates, String table, String description, CommandCategory category)
     {
-        this(updates, table, supertable, null, description, category);
+        this(updates, table, null, description, category);
     }
 
-    Command(boolean updates, String table, String supertable, String extra, String description, CommandCategory category)
+    Command(boolean updates, String table, String extra, String description, CommandCategory category)
     {
         this.table = table;
-        this.supertable = supertable;
         this.updates = updates;
         this.category = category;
         List<String> names = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
index a9e669c..b9e402e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
@@ -32,6 +32,7 @@ abstract class Option
     abstract String longDisplay();
     abstract List<String> multiLineDisplay();
     abstract boolean setByUser();
+    abstract boolean present();
 
     public int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
index 9c2f367..28d8f96 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
@@ -73,6 +73,11 @@ public final class OptionAnyProbabilities extends OptionMulti
         {
             return !options.isEmpty();
         }
+
+        boolean present()
+        {
+            return setByUser();
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
index 45e832a..7186efb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
@@ -134,6 +134,11 @@ public class OptionDistribution extends Option
         return spec != null;
     }
 
+    boolean present()
+    {
+        return setByUser() || defaultSpec != null;
+    }
+
     @Override
     public String shortDisplay()
     {
@@ -209,7 +214,7 @@ public class OptionDistribution extends Option
                     stdev = ((max - min) / 2d) / stdevsToEdge;
                 }
                 return new GaussianFactory(min, max, mean, stdev);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
             }
@@ -233,7 +238,7 @@ public class OptionDistribution extends Option
                 // over entire range, but this results in overly skewed distribution, so take sqrt
                 final double mean = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
                 return new ExpFactory(min, max, mean);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
             }
@@ -258,7 +263,7 @@ public class OptionDistribution extends Option
                 // over entire range, but this results in overly skewed distribution, so take sqrt
                 final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
                 return new ExtremeFactory(min, max, shape, scale);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for extreme (Weibull) distribution: " + params);
             }
@@ -284,7 +289,7 @@ public class OptionDistribution extends Option
                 // over entire range, but this results in overly skewed distribution, so take sqrt
                 final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
                 return new QuantizedExtremeFactory(min, max, shape, scale, quantas);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for quantized extreme (Weibull) distribution: " + params);
             }
@@ -305,7 +310,7 @@ public class OptionDistribution extends Option
                 final long min = parseLong(bounds[0]);
                 final long max = parseLong(bounds[1]);
                 return new UniformFactory(min, max);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
             }
@@ -324,7 +329,7 @@ public class OptionDistribution extends Option
             {
                 final long key = parseLong(params.get(0));
                 return new FixedFactory(key);
-            } catch (Exception _)
+            } catch (Exception ignore)
             {
                 throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
index 6d11012..ad89a5b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
@@ -182,6 +182,11 @@ abstract class OptionMulti extends Option
         {
             return !options.isEmpty();
         }
+
+        boolean present()
+        {
+            return !options.isEmpty();
+        }
     }
 
     List<Option> optionsSetByUser()
@@ -197,7 +202,7 @@ abstract class OptionMulti extends Option
     {
         List<Option> r = new ArrayList<>();
         for (Option option : delegate.options())
-            if (!option.setByUser() && option.happy())
+            if (!option.setByUser() && option.present())
                 r.add(option);
         return r;
     }
@@ -210,4 +215,12 @@ abstract class OptionMulti extends Option
         return false;
     }
 
+    boolean present()
+    {
+        for (Option option : delegate.options())
+            if (option.present())
+                return true;
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
index 416f045..67cd1a9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
@@ -124,6 +124,11 @@ public class OptionRatioDistribution extends Option
         return delegate.setByUser();
     }
 
+    boolean present()
+    {
+        return delegate.present();
+    }
+
     @Override
     public String shortDisplay()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
index 8b65587..8d1a6fc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
@@ -81,7 +81,7 @@ class OptionReplication extends OptionMulti
                         throw new IllegalArgumentException(clazz + " is not a replication strategy");
                     strategy = fullname;
                     break;
-                } catch (Exception _)
+                } catch (Exception ignore)
                 {
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
index e8b45ec..60b4c09 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
@@ -27,18 +27,27 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 
 // Generic command settings - common to read/write/etc
 public abstract class SettingsCommand implements Serializable
 {
 
+    public static enum TruncateWhen
+    {
+        NEVER, ONCE, ALWAYS
+    }
+
     public final Command type;
     public final long count;
     public final long duration;
     public final TimeUnit durationUnits;
     public final boolean noWarmup;
+    public final TruncateWhen truncate;
     public final ConsistencyLevel consistencyLevel;
     public final double targetUncertainty;
     public final int minimumUncertaintyMeasurements;
@@ -60,6 +69,8 @@ public abstract class SettingsCommand implements Serializable
         this.type = type;
         this.consistencyLevel = ConsistencyLevel.valueOf(options.consistencyLevel.value().toUpperCase());
         this.noWarmup = options.noWarmup.setByUser();
+        this.truncate = TruncateWhen.valueOf(options.truncate.value().toUpperCase());
+
         if (count != null)
         {
             this.count = OptionDistribution.parseLong(count.count.value());
@@ -107,7 +118,8 @@ public abstract class SettingsCommand implements Serializable
     static abstract class Options extends GroupedOptions
     {
         final OptionSimple noWarmup = new OptionSimple("no-warmup", "", null, "Do not warmup the process", false);
-        final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|SERIAL|LOCAL_SERIAL|LOCAL_ONE", "ONE", "Consistency level to use", false);
+        final OptionSimple truncate = new OptionSimple("truncate=", "never|once|always", "never", "Truncate the table: never, before performing any work, or before each iteration", false);
+        final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY", "ONE", "Consistency level to use", false);
     }
 
     static class Count extends Options
@@ -116,7 +128,7 @@ public abstract class SettingsCommand implements Serializable
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(count, noWarmup, consistencyLevel);
+            return Arrays.asList(count, noWarmup, truncate, consistencyLevel);
         }
     }
 
@@ -126,7 +138,7 @@ public abstract class SettingsCommand implements Serializable
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(duration, noWarmup, consistencyLevel);
+            return Arrays.asList(duration, noWarmup, truncate, consistencyLevel);
         }
     }
 
@@ -138,10 +150,26 @@ public abstract class SettingsCommand implements Serializable
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, consistencyLevel);
+            return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, truncate, consistencyLevel);
         }
     }
 
+    public abstract void truncateTables(StressSettings settings);
+
+    protected void truncateTables(StressSettings settings, String ks, String ... tables)
+    {
+        JavaDriverClient client = settings.getJavaDriverClient(false);
+        assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER;
+        for (String table : tables)
+        {
+            String cql = String.format("TRUNCATE %s.%s", ks, table);
+            client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+        }
+        System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.",
+                                         ks, Arrays.toString(tables), settings.node.nodes.size()));
+        Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+    }
+
     // CLI Utility Methods
 
     static SettingsCommand get(Map<String, String[]> clArgs)
@@ -171,18 +199,6 @@ public abstract class SettingsCommand implements Serializable
         return null;
     }
 
-/*    static SettingsCommand build(Command type, String[] params)
-    {
-        GroupedOptions options = GroupedOptions.select(params, new Count(), new Duration(), new Uncertainty());
-        if (options == null)
-        {
-            printHelp(type);
-            System.out.println("Invalid " + type + " options provided, see output for valid options");
-            System.exit(1);
-        }
-        return new SettingsCommand(type, options);
-    }*/
-
     static void printHelp(Command type)
     {
         printHelp(type.toString().toLowerCase());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index ee1958b..83f444c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.stress.operations.FixedOpDistribution;
 import org.apache.cassandra.stress.operations.OpDistribution;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.operations.predefined.PredefinedOperation;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.Timing;
 
 // Settings unique to the mixed command type
 public class SettingsCommandPreDefined extends SettingsCommand
@@ -51,9 +51,10 @@ public class SettingsCommandPreDefined extends SettingsCommand
         final SeedManager seeds = new SeedManager(settings);
         return new OpDistributionFactory()
         {
-            public OpDistribution get(Timer timer)
+            public OpDistribution get(Timing timing, int sampleCount)
             {
-                return new FixedOpDistribution(PredefinedOperation.operation(type, timer, newGenerator(settings), seeds, settings, add));
+                return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
+                                               newGenerator(settings), seeds, settings, add));
             }
 
             public String desc()
@@ -108,6 +109,11 @@ public class SettingsCommandPreDefined extends SettingsCommand
 
     }
 
+    public void truncateTables(StressSettings settings)
+    {
+        truncateTables(settings, settings.schema.keyspace, "standard1", "counter1", "counter3");
+    }
+
     // CLI utility methods
 
     public static SettingsCommandPreDefined build(Command type, String[] params)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index d5b221c..d4e43cf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -89,6 +89,11 @@ public class SettingsCommandUser extends SettingsCommand
         };
     }
 
+    public void truncateTables(StressSettings settings)
+    {
+        profile.truncateTable(settings);
+    }
+
     static final class Options extends GroupedOptions
     {
         final SettingsCommand.Options parent;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
index ff625a8..88e8020 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -39,6 +39,7 @@ public final class Timer
     private int opCount;
 
     // aggregate info
+    private long errorCount;
     private long partitionCount;
     private long rowCount;
     private long total;
@@ -78,7 +79,7 @@ public final class Timer
         return finalReport == null;
     }
 
-    public void stop(long partitionCount, long rowCount)
+    public void stop(long partitionCount, long rowCount, boolean error)
     {
         maybeReport();
         long now = System.nanoTime();
@@ -94,6 +95,8 @@ public final class Timer
         opCount += 1;
         this.partitionCount += partitionCount;
         this.rowCount += rowCount;
+        if (error)
+            this.errorCount++;
         upToDateAsOf = now;
     }
 
@@ -108,14 +111,15 @@ public final class Timer
                 (       new SampleOfLongs(Arrays.copyOf(sample, index(opCount)), p(opCount)),
                         new SampleOfLongs(Arrays.copyOfRange(sample, index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1)
                 );
-        final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount, rowCount, total, opCount,
-                SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
+        final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount,
+                rowCount, total, opCount, errorCount, SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
         // reset counters
         opCount = 0;
         partitionCount = 0;
         rowCount = 0;
         total = 0;
         max = 0;
+        errorCount = 0;
         lastSnap = upToDateAsOf;
         return report;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index 9464b19..403bee0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -21,9 +21,7 @@ package org.apache.cassandra.stress.util;
  */
 
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -36,9 +34,10 @@ import java.util.concurrent.TimeUnit;
 // metrics calculated over the interval are accurate
 public class Timing
 {
-
-    private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>();
-    private volatile TimingInterval history;
+    // concurrency: this should be ok as the consumers are created serially by StressAction.run / warmup
+    // Probably the CopyOnWriteArrayList could be changed to an ordinary list as well.
+    private final Map<String, List<Timer>> timers = new TreeMap<>();
+    private volatile TimingIntervals history;
     private final int historySampleCount;
     private final int reportSampleCount;
     private boolean done;
@@ -54,22 +53,31 @@ public class Timing
     public static class TimingResult<E>
     {
         public final E extra;
-        public final TimingInterval timing;
-        public TimingResult(E extra, TimingInterval timing)
+        public final TimingIntervals intervals;
+        public TimingResult(E extra, TimingIntervals intervals)
         {
             this.extra = extra;
-            this.timing = timing;
+            this.intervals = intervals;
         }
     }
 
     public <E> TimingResult<E> snap(Callable<E> call) throws InterruptedException
     {
-        final Timer[] timers = this.timers.toArray(new Timer[0]);
-        final CountDownLatch ready = new CountDownLatch(timers.length);
-        for (int i = 0 ; i < timers.length ; i++)
+        // Count up total # of timers
+        int timerCount = 0;
+        for (List<Timer> timersForOperation : timers.values())
         {
-            final Timer timer = timers[i];
-            timer.requestReport(ready);
+            timerCount += timersForOperation.size();
+        }
+        final CountDownLatch ready = new CountDownLatch(timerCount);
+
+        // request reports
+        for (List <Timer> timersForOperation : timers.values())
+        {
+            for(Timer timer : timersForOperation)
+            {
+                timer.requestReport(ready);
+            }
         }
 
         E extra;
@@ -86,34 +94,48 @@ public class Timing
 
         // TODO fail gracefully after timeout if a thread is stuck
         if (!ready.await(5L, TimeUnit.MINUTES))
+        {
             throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck. Check GC/Heap size");
+        }
 
         boolean done = true;
+
         // reports have been filled in by timer threadCount, so merge
-        List<TimingInterval> intervals = new ArrayList<>();
-        for (Timer timer : timers)
+        Map<String, TimingInterval> intervals = new TreeMap<>();
+        for (Map.Entry<String, List<Timer>> entry : timers.entrySet())
         {
-            intervals.add(timer.report);
-            done &= !timer.running();
+            List<TimingInterval> operationIntervals = new ArrayList<>();
+            for (Timer timer : entry.getValue())
+            {
+                operationIntervals.add(timer.report);
+                done &= !timer.running();
+            }
+
+            intervals.put(entry.getKey(), TimingInterval.merge(operationIntervals, reportSampleCount,
+                                                              history.get(entry.getKey()).endNanos()));
         }
 
+        TimingIntervals result = new TimingIntervals(intervals);
         this.done = done;
-        TimingResult<E> result = new TimingResult<>(extra, TimingInterval.merge(intervals, reportSampleCount, history.endNanos()));
-        history = TimingInterval.merge(Arrays.asList(result.timing, history), historySampleCount, history.startNanos());
-        return result;
+        history = history.merge(result, historySampleCount, history.startNanos());
+        return new TimingResult<>(extra, result);
     }
 
-    // build a new timer and add it to the set of running timers
-    public Timer newTimer(int sampleCount)
+    // build a new timer and add it to the set of running timers.
+    public Timer newTimer(String opType, int sampleCount)
     {
         final Timer timer = new Timer(sampleCount);
-        timers.add(timer);
+
+        if (!timers.containsKey(opType))
+            timers.put(opType, new ArrayList<Timer>());
+
+        timers.get(opType).add(timer);
         return timer;
     }
 
     public void start()
     {
-        history = new TimingInterval(System.nanoTime());
+        history = new TimingIntervals(timers.keySet());
     }
 
     public boolean done()
@@ -121,9 +143,8 @@ public class Timing
         return done;
     }
 
-    public TimingInterval getHistory()
+    public TimingIntervals getHistory()
     {
         return history;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
index 065ea52..6be71c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -20,10 +20,7 @@ package org.apache.cassandra.stress.util;
  * 
  */
 
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
 // represents measurements taken over an interval of time
@@ -42,18 +39,28 @@ public final class TimingInterval
     public final long partitionCount;
     public final long rowCount;
     public final long operationCount;
+    public final long errorCount;
 
     final SampleOfLongs sample;
 
+    public String toString()
+    {
+        return String.format("Start: %d end: %d maxLatency: %d pauseLength: %d pauseStart: %d totalLatency: %d" +
+                             " pCount: %d rcount: %d opCount: %d errors: %d", start, end, maxLatency, pauseLength,
+                             pauseStart, totalLatency, partitionCount, rowCount, operationCount, errorCount);
+    }
+
     TimingInterval(long time)
     {
         start = end = time;
         maxLatency = totalLatency = 0;
-        partitionCount = rowCount = operationCount = 0;
+        partitionCount = rowCount = operationCount = errorCount = 0;
         pauseStart = pauseLength = 0;
         sample = new SampleOfLongs(new long[0], 1d);
     }
-    TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount, long rowCount, long totalLatency, long operationCount, SampleOfLongs sample)
+
+    TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount,
+                   long rowCount, long totalLatency, long operationCount, long errorCount, SampleOfLongs sample)
     {
         this.start = start;
         this.end = Math.max(end, start);
@@ -61,6 +68,7 @@ public final class TimingInterval
         this.partitionCount = partitionCount;
         this.rowCount = rowCount;
         this.totalLatency = totalLatency;
+        this.errorCount = errorCount;
         this.operationCount = operationCount;
         this.pauseStart = pauseStart;
         this.pauseLength = pauseLength;
@@ -68,33 +76,41 @@ public final class TimingInterval
     }
 
     // merge multiple timer intervals together
-    static TimingInterval merge(List<TimingInterval> intervals, int maxSamples, long start)
+    static TimingInterval merge(Iterable<TimingInterval> intervals, int maxSamples, long start)
     {
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
-        long operationCount = 0, partitionCount = 0, rowCount = 0;
+        long operationCount = 0, partitionCount = 0, rowCount = 0, errorCount = 0;
         long maxLatency = 0, totalLatency = 0;
         List<SampleOfLongs> latencies = new ArrayList<>();
         long end = 0;
         long pauseStart = 0, pauseEnd = Long.MAX_VALUE;
         for (TimingInterval interval : intervals)
         {
-            end = Math.max(end, interval.end);
-            operationCount += interval.operationCount;
-            maxLatency = Math.max(interval.maxLatency, maxLatency);
-            totalLatency += interval.totalLatency;
-            partitionCount += interval.partitionCount;
-            rowCount += interval.rowCount;
-            latencies.addAll(Arrays.asList(interval.sample));
-            if (interval.pauseLength > 0)
+            if (interval != null)
             {
-                pauseStart = Math.max(pauseStart, interval.pauseStart);
-                pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength);
+                end = Math.max(end, interval.end);
+                operationCount += interval.operationCount;
+                maxLatency = Math.max(interval.maxLatency, maxLatency);
+                totalLatency += interval.totalLatency;
+                partitionCount += interval.partitionCount;
+                rowCount += interval.rowCount;
+                errorCount += interval.errorCount;
+                latencies.addAll(Arrays.asList(interval.sample));
+                if (interval.pauseLength > 0)
+                {
+                    pauseStart = Math.max(pauseStart, interval.pauseStart);
+                    pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength);
+                }
             }
         }
-        if (pauseEnd < pauseStart)
+
+        if (pauseEnd < pauseStart || pauseStart <= 0)
+        {
             pauseEnd = pauseStart = 0;
-        return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount, totalLatency, operationCount,
-                SampleOfLongs.merge(rnd, latencies, maxSamples));
+        }
+
+        return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount,
+                                  totalLatency, operationCount, errorCount, SampleOfLongs.merge(rnd, latencies, maxSamples));
 
     }
 
@@ -128,11 +144,6 @@ public final class TimingInterval
         return maxLatency * 0.000001d;
     }
 
-    public long runTime()
-    {
-        return (end - start) / 1000000;
-    }
-
     public double medianLatency()
     {
         return sample.medianLatency();
@@ -144,19 +155,48 @@ public final class TimingInterval
         return sample.rankLatency(rank);
     }
 
-    public final long endNanos()
+    public long runTime()
     {
-        return end;
+        return (end - start) / 1000000;
     }
 
-    public final long endMillis()
+    public final long endNanos()
     {
-        return end / 1000000;
+        return end;
     }
 
     public long startNanos()
     {
         return start;
     }
-}
+
+    public static enum TimingParameter
+    {
+        OPRATE, ROWRATE, ADJROWRATE, PARTITIONRATE, MEANLATENCY, MAXLATENCY, MEDIANLATENCY, RANKLATENCY,
+        ERRORCOUNT, PARTITIONCOUNT
+    }
+
+    String getStringValue(TimingParameter value)
+    {
+        return getStringValue(value, Float.NaN);
+    }
+
+    String getStringValue(TimingParameter value, float rank)
+    {
+        switch (value)
+        {
+            case OPRATE:         return String.format("%.0f", opRate());
+            case ROWRATE:        return String.format("%.0f", rowRate());
+            case ADJROWRATE:     return String.format("%.0f", adjustedRowRate());
+            case PARTITIONRATE:  return String.format("%.0f", partitionRate());
+            case MEANLATENCY:    return String.format("%.1f", meanLatency());
+            case MAXLATENCY:     return String.format("%.1f", maxLatency());
+            case MEDIANLATENCY:  return String.format("%.1f", medianLatency());
+            case RANKLATENCY:    return String.format("%.1f", rankLatency(rank));
+            case ERRORCOUNT:     return String.format("%d", errorCount);
+            case PARTITIONCOUNT: return String.format("%d", partitionCount);
+            default:             throw new IllegalStateException();
+        }
+    }
+ }
 


Mime
View raw message