cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] cassandra git commit: Introduce HdrHistogram and response/service/wait separation to stress tool
Date Fri, 27 May 2016 16:00:16 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk d9b192e33 -> 89f275c65


http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index fa36716..668518c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,53 +8,82 @@ package org.apache.cassandra.stress;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.io.FileNotFoundException;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
 
-import org.apache.cassandra.stress.util.*;
-import org.apache.commons.lang3.time.DurationFormatUtils;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
 import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JmxCollector;
+import org.apache.cassandra.stress.util.Timing;
+import org.apache.cassandra.stress.util.TimingInterval;
+import org.apache.cassandra.stress.util.TimingIntervals;
+import org.apache.cassandra.stress.util.Uncertainty;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang3.time.DurationFormatUtils;
 
 public class StressMetrics
 {
 
-    private static final ThreadFactory tf = new NamedThreadFactory("StressMetrics");
-
     private final PrintStream output;
     private final Thread thread;
-    private volatile boolean stop = false;
-    private volatile boolean cancelled = false;
     private final Uncertainty rowRateUncertainty = new Uncertainty();
     private final CountDownLatch stopped = new CountDownLatch(1);
     private final Timing timing;
     private final Callable<JmxCollector.GcStats> gcStatsCollector;
+    private final HistogramLogWriter histogramWriter;
+    private final long epochNs = System.nanoTime();
+    private final long epochMs = System.currentTimeMillis();
+
     private volatile JmxCollector.GcStats totalGcStats;
-    private final StressSettings settings;
+
+    private volatile boolean stop = false;
+    private volatile boolean cancelled = false;
 
     public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
     {
         this.output = output;
-        this.settings = settings;
+        if(settings.log.hdrFile != null)
+        {
+            try
+            {
+                histogramWriter = new HistogramLogWriter(settings.log.hdrFile);
+                histogramWriter.outputComment("Logging op latencies for Cassandra Stress");
+                histogramWriter.outputLogFormatVersion();
+                histogramWriter.outputBaseTime(epochMs);
+                histogramWriter.setBaseTime(epochMs);
+                histogramWriter.outputStartTime(epochMs);
+                histogramWriter.outputLegend();
+            }
+            catch (FileNotFoundException e)
+            {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        else
+        {
+            histogramWriter = null;
+        }
         Callable<JmxCollector.GcStats> gcStatsCollector;
         totalGcStats = new JmxCollector.GcStats(0);
         try
@@ -78,10 +107,10 @@ public class StressMetrics
             };
         }
         this.gcStatsCollector = gcStatsCollector;
-        this.timing = new Timing(settings.samples.historyCount, settings.samples.reportCount);
+        this.timing = new Timing(settings.rate.isFixed);
 
         printHeader("", output);
-        thread = tf.newThread(new Runnable()
+        thread = new Thread(new Runnable()
         {
             @Override
             public void run()
@@ -125,6 +154,7 @@ public class StressMetrics
                 }
             }
         });
+        thread.setName("StressMetrics");
     }
 
     public void start()
@@ -156,15 +186,20 @@ public class StressMetrics
     {
         Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector);
         totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra));
-        TimingInterval current = result.intervals.combine(settings.samples.reportCount);
-        TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
+        TimingInterval current = result.intervals.combine();
+        TimingInterval history = timing.getHistory().combine();
         rowRateUncertainty.update(current.adjustedRowRate());
-        if (current.operationCount != 0)
+        if (current.operationCount() != 0)
         {
             if (result.intervals.intervals().size() > 1)
             {
                 for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet())
-                    printRow("", type.getKey(), type.getValue(), timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
+                {
+                    final String opName = type.getKey();
+                    final TimingInterval opInterval = type.getValue();
+                    printRow("", opName, opInterval, timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
+                    logHistograms(opName, opInterval);
+                }
             }
 
             printRow("", "total", current, history, result.extra, rowRateUncertainty, output);
@@ -174,6 +209,30 @@ public class StressMetrics
     }
 
 
+    private void logHistograms(String opName, TimingInterval opInterval)
+    {
+        if (histogramWriter == null)
+            return;
+        final long startNs = opInterval.startNanos();
+        final long endNs = opInterval.endNanos();
+
+        logHistogram(opName + "-st", startNs, endNs, opInterval.serviceTime());
+        logHistogram(opName + "-rt", startNs, endNs, opInterval.responseTime());
+        logHistogram(opName + "-wt", startNs, endNs, opInterval.waitTime());
+    }
+
+    private void logHistogram(String opName, final long startNs, final long endNs, final Histogram histogram)
+    {
+        if (histogram.getTotalCount() != 0)
+        {
+            histogram.setTag(opName);
+            histogram.setStartTimeStamp(epochMs + NANOSECONDS.toMillis(startNs - epochNs));
+            histogram.setEndTimeStamp(epochMs + NANOSECONDS.toMillis(endNs - epochNs));
+            histogramWriter.outputIntervalHistogram(histogram);
+        }
+    }
+
+
     // PRINT FORMATTING
 
     public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
@@ -190,17 +249,17 @@ public class StressMetrics
     {
         output.println(prefix + String.format(ROWFORMAT,
                 type + ",",
-                total.operationCount,
+                total.operationCount(),
                 interval.opRate(),
                 interval.partitionRate(),
                 interval.rowRate(),
-                interval.meanLatency(),
-                interval.medianLatency(),
-                interval.rankLatency(0.95f),
-                interval.rankLatency(0.99f),
-                interval.rankLatency(0.999f),
-                interval.maxLatency(),
-                total.runTime() / 1000f,
+                interval.meanLatencyMs(),
+                interval.medianLatencyMs(),
+                interval.latencyAtPercentileMs(95.0),
+                interval.latencyAtPercentileMs(99.0),
+                interval.latencyAtPercentileMs(99.9),
+                interval.maxLatencyMs(),
+                total.runTimeMs() / 1000f,
                 opRateUncertainty.getUncertainty(),
                 interval.errorCount,
                 gcStats.count,
@@ -217,16 +276,16 @@ public class StressMetrics
         output.println("Results:");
 
         TimingIntervals opHistory = timing.getHistory();
-        TimingInterval history = opHistory.combine(settings.samples.historyCount);
+        TimingInterval history = opHistory.combine();
         output.println(String.format("Op rate                   : %,8.0f op/s  %s", history.opRate(), opHistory.opRates()));
         output.println(String.format("Partition rate            : %,8.0f pk/s  %s", history.partitionRate(), opHistory.partitionRates()));
         output.println(String.format("Row rate                  : %,8.0f row/s %s", history.rowRate(), opHistory.rowRates()));
-        output.println(String.format("Latency mean              : %6.1f ms %s", history.meanLatency(), opHistory.meanLatencies()));
-        output.println(String.format("Latency median            : %6.1f ms %s", history.medianLatency(), opHistory.medianLatencies()));
-        output.println(String.format("Latency 95th percentile   : %6.1f ms %s", history.rankLatency(.95f), opHistory.rankLatencies(0.95f)));
-        output.println(String.format("Latency 99th percentile   : %6.1f ms %s", history.rankLatency(0.99f), opHistory.rankLatencies(0.99f)));
-        output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.rankLatency(0.999f), opHistory.rankLatencies(0.999f)));
-        output.println(String.format("Latency max               : %6.1f ms %s", history.maxLatency(), opHistory.maxLatencies()));
+        output.println(String.format("Latency mean              : %6.1f ms %s", history.meanLatencyMs(), opHistory.meanLatencies()));
+        output.println(String.format("Latency median            : %6.1f ms %s", history.medianLatencyMs(), opHistory.medianLatencies()));
+        output.println(String.format("Latency 95th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(95.0), opHistory.latenciesAtPercentile(95.0)));
+        output.println(String.format("Latency 99th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(99.0), opHistory.latenciesAtPercentile(99.0)));
+        output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.9), opHistory.latenciesAtPercentile(99.9)));
+        output.println(String.format("Latency max               : %6.1f ms %s", history.maxLatencyMs(), opHistory.maxLatencies()));
         output.println(String.format("Total partitions          : %,10d %s",   history.partitionCount, opHistory.partitionCounts()));
         output.println(String.format("Total errors              : %,10d %s",   history.errorCount, opHistory.errorCounts()));
         output.println(String.format("Total GC count            : %,1.0f", totalGcStats.count));
@@ -235,11 +294,11 @@ public class StressMetrics
         output.println(String.format("Avg GC time               : %,6.1f ms", totalGcStats.summs / totalGcStats.count));
         output.println(String.format("StdDev GC time            : %,6.1f ms", totalGcStats.sdvms));
         output.println("Total operation time      : " + DurationFormatUtils.formatDuration(
-                history.runTime(), "HH:mm:ss", true));
+                history.runTimeMs(), "HH:mm:ss", true));
         output.println(""); // Newline is important here to separate the aggregates section from the END or the next stress iteration
     }
 
-    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out, int historySampleCount)
+    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
     {
         int idLen = 0;
         for (String id : ids)
@@ -258,7 +317,7 @@ public class StressMetrics
                          summarise.get(i).rowRateUncertainty,
                          out);
             }
-            TimingInterval hist = summarise.get(i).timing.getHistory().combine(historySampleCount);
+            TimingInterval hist = summarise.get(i).timing.getHistory().combine();
             printRow(String.format(formatstr, ids.get(i)),
                     "total",
                     hist,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index f2616cf..9a3522c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.operations;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.operations;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -37,13 +37,8 @@ public class FixedOpDistribution implements OpDistribution
         return operation;
     }
 
-    public void initTimers()
-    {
-        operation.timer.init();
-    }
-
     public void closeTimers()
     {
-        operation.timer.close();
+        operation.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index e09300a..33a0c93 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.operations;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.operations;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -28,6 +28,5 @@ public interface OpDistribution
 
     Operation next();
 
-    public void initTimers();
     public void closeTimers();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 5fbb0f9..14e6dfb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.operations;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.operations;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -25,7 +25,7 @@ import org.apache.cassandra.stress.util.Timing;
 
 public interface OpDistributionFactory
 {
-    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup);
+    public OpDistribution get(Timing timing, boolean isWarmup);
     public String desc();
     Iterable<OpDistributionFactory> each();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
index 45c36f2..784b2ac 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -77,14 +77,14 @@ public abstract class PartitionOperation extends Operation
         this.spec = spec;
     }
 
-    public boolean ready(WorkManager permits, RateLimiter rateLimiter)
+    public int ready(WorkManager permits)
     {
         int partitionCount = (int) spec.partitionCount.next();
         if (partitionCount <= 0)
-            return false;
+            return 0;
         partitionCount = permits.takePermits(partitionCount);
         if (partitionCount <= 0)
-            return false;
+            return 0;
 
         int i = 0;
         boolean success = true;
@@ -105,11 +105,8 @@ public abstract class PartitionOperation extends Operation
         }
         partitionCount = i;
 
-        if (rateLimiter != null)
-            rateLimiter.acquire(partitionCount);
-
         partitions = partitionCache.subList(0, partitionCount);
-        return !partitions.isEmpty();
+        return partitions.size();
     }
 
     protected boolean reset(Seed seed, PartitionIterator iterator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index 9698421..fc0229e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.operations;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.operations;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -52,19 +52,11 @@ public class SampledOpDistribution implements OpDistribution
         return cur;
     }
 
-    public void initTimers()
-    {
-        for (Pair<Operation, Double> op : operations.getPmf())
-        {
-            op.getFirst().timer.init();
-        }
-    }
-
     public void closeTimers()
     {
         for (Pair<Operation, Double> op : operations.getPmf())
         {
-            op.getFirst().timer.close();
+            op.getFirst().close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index a10585d..0b206f9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.operations;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.operations;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -47,13 +47,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
     protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
     protected abstract PartitionGenerator newGenerator();
 
-    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
+    public OpDistribution get(Timing timing, boolean isWarmup)
     {
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
         {
-            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString()),
                                                 generator, ratio.getKey(), isWarmup);
             for (Operation op : ops)
                 operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
@@ -76,9 +76,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         {
             out.add(new OpDistributionFactory()
             {
-                public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
+                public OpDistribution get(Timing timing, boolean isWarmup)
                 {
-                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString()),
                                                                                           newGenerator(),
                                                                                           ratio.getKey(),
                                                                                           isWarmup);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index 60a6c48..198f1f5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -248,18 +248,16 @@ public class TokenRangeQuery extends Operation
         timeWithRetry(new ThriftRun(client));
     }
 
-    public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
+    public int ready(WorkManager workManager)
     {
         tokenRangeIterator.update();
 
         if (tokenRangeIterator.exhausted() && currentState.get() == null)
-            return false;
+            return 0;
 
         int numLeft = workManager.takePermits(1);
-        if (rateLimiter != null && numLeft > 0 )
-            rateLimiter.acquire(numLeft);
 
-        return numLeft > 0;
+        return numLeft > 0 ? 1 : 0;
     }
 
     public String key()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
index 6d8e184..36284ab 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -32,7 +32,6 @@ public enum CliOption
     RATE("Thread count, rate limit or automatic mode (default is auto)", SettingsRate.helpPrinter()),
     MODE("Thrift or CQL with options", SettingsMode.helpPrinter()),
     ERRORS("How to handle errors when encountered during stress", SettingsErrors.helpPrinter()),
-    SAMPLE("Specify the number of samples to collect for measuring latency", SettingsSamples.helpPrinter()),
     SCHEMA("Replication settings, compression, compaction, etc.", SettingsSchema.helpPrinter()),
     NODE("Nodes to connect to", SettingsNode.helpPrinter()),
     LOG("Where to log progress to, and the interval at which to do it", SettingsLog.helpPrinter()),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index c2f2591..8e93ff6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -51,9 +51,9 @@ public class SettingsCommandPreDefined extends SettingsCommand
         final SeedManager seeds = new SeedManager(settings);
         return new OpDistributionFactory()
         {
-            public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
+            public OpDistribution get(Timing timing, boolean isWarmup)
             {
-                return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
+                return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString()),
                                                newGenerator(settings), seeds, settings, add));
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
index e5f155b..602435c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -37,6 +37,7 @@ public class SettingsLog implements Serializable
 
     public final boolean noSummary;
     public final File file;
+    public final File hdrFile;
     public final int intervalMillis;
     public final Level level;
 
@@ -48,7 +49,10 @@ public class SettingsLog implements Serializable
             file = new File(options.outputFile.value());
         else
             file = null;
-
+        if (options.hdrOutputFile.setByUser())
+            hdrFile = new File(options.hdrOutputFile.value());
+        else
+            hdrFile = null;
         String interval = options.interval.value();
         if (interval.endsWith("ms"))
             intervalMillis = Integer.parseInt(interval.substring(0, interval.length() - 2));
@@ -78,13 +82,14 @@ public class SettingsLog implements Serializable
     {
         final OptionSimple noSummmary = new OptionSimple("no-summary", "", null, "Disable printing of aggregate statistics at the end of a test", false);
         final OptionSimple outputFile = new OptionSimple("file=", ".*", null, "Log to a file", false);
+        final OptionSimple hdrOutputFile = new OptionSimple("hdrfile=", ".*", null, "Log to a file", false);
         final OptionSimple interval = new OptionSimple("interval=", "[0-9]+(ms|s|)", "1s", "Log progress every <value> seconds or milliseconds", false);
         final OptionSimple level = new OptionSimple("level=", "(minimal|normal|verbose)", "normal", "Logging level (minimal, normal or verbose)", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(level, noSummmary, outputFile, interval);
+            return Arrays.asList(level, noSummmary, outputFile, hdrOutputFile, interval);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
index 0486678..7a5995b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -33,14 +33,22 @@ public class SettingsRate implements Serializable
     public final int minThreads;
     public final int maxThreads;
     public final int threadCount;
-    public final int opRateTargetPerSecond;
+    public final int opsPerSecond;
+    public final boolean isFixed;
 
     public SettingsRate(ThreadOptions options)
     {
         auto = false;
         threadCount = Integer.parseInt(options.threads.value());
-        String rateOpt = options.rate.value();
-        opRateTargetPerSecond = Integer.parseInt(rateOpt.substring(0, rateOpt.length() - 2));
+        String throttleOpt = options.throttle.value();
+        String fixedOpt = options.fixed.value();
+        int throttle = Integer.parseInt(throttleOpt.substring(0, throttleOpt.length() - 2));
+        int fixed = Integer.parseInt(fixedOpt.substring(0, fixedOpt.length() - 2));
+        if(throttle != 0 && fixed != 0)
+            throw new IllegalArgumentException("can't have both fixed and throttle set, choose one.");
+        opsPerSecond = Math.max(fixed, throttle);
+        isFixed = (opsPerSecond == fixed);
+
         minThreads = -1;
         maxThreads = -1;
     }
@@ -51,7 +59,8 @@ public class SettingsRate implements Serializable
         this.minThreads = Integer.parseInt(auto.minThreads.value());
         this.maxThreads = Integer.parseInt(auto.maxThreads.value());
         this.threadCount = -1;
-        this.opRateTargetPerSecond = 0;
+        this.opsPerSecond = 0;
+        isFixed = false;
     }
 
 
@@ -73,12 +82,13 @@ public class SettingsRate implements Serializable
     private static final class ThreadOptions extends GroupedOptions
     {
         final OptionSimple threads = new OptionSimple("threads=", "[0-9]+", null, "run this many clients concurrently", true);
-        final OptionSimple rate = new OptionSimple("limit=", "[0-9]+/s", "0/s", "limit operations per second across all clients", false);
+        final OptionSimple throttle = new OptionSimple("throttle=", "[0-9]+/s", "0/s", "throttle operations per second across all clients to a maximum rate (or less) with no implied schedule", false);
+        final OptionSimple fixed = new OptionSimple("fixed=", "[0-9]+/s", "0/s", "expect fixed rate of operations per second across all clients with implied schedule", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(threads, rate);
+            return Arrays.asList(threads, throttle, fixed);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSamples.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSamples.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSamples.java
deleted file mode 100644
index 7a9f484..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSamples.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.cassandra.stress.settings;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class SettingsSamples implements Serializable
-{
-
-    public final int liveCount;
-    public final int historyCount;
-    public final int reportCount;
-
-    public SettingsSamples(SampleOptions options)
-    {
-        liveCount = (int) OptionDistribution.parseLong(options.liveCount.value());
-        historyCount = (int) OptionDistribution.parseLong(options.historyCount.value());
-        reportCount = (int) OptionDistribution.parseLong(options.reportCount.value());
-    }
-
-    // Option Declarations
-
-    private static final class SampleOptions extends GroupedOptions
-    {
-        final OptionSimple historyCount = new OptionSimple("history=", "[0-9]+[bmk]?", "50K", "The number of samples to save across the whole run", false);
-        final OptionSimple liveCount = new OptionSimple("live=", "[0-9]+[bmk]?", "1M", "The number of samples to save between reports", false);
-        final OptionSimple reportCount = new OptionSimple("report=", "[0-9]+[bmk]?", "100K", "The maximum number of samples to use when building a report", false);
-
-        @Override
-        public List<? extends Option> options()
-        {
-            return Arrays.asList(historyCount, liveCount, reportCount);
-        }
-    }
-
-    // CLI Utility Methods
-
-    public static SettingsSamples get(Map<String, String[]> clArgs)
-    {
-        String[] params = clArgs.remove("-sample");
-        if (params == null)
-        {
-            return new SettingsSamples(new SampleOptions());
-        }
-        SampleOptions options = GroupedOptions.select(params, new SampleOptions());
-        if (options == null)
-        {
-            printHelp();
-            System.out.println("Invalid -sample options provided, see output for valid options");
-            System.exit(1);
-        }
-        return new SettingsSamples(options);
-    }
-
-    public static void printHelp()
-    {
-        GroupedOptions.printOptions(System.out, "-sample", new SampleOptions());
-    }
-
-    public static Runnable helpPrinter()
-    {
-        return new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                printHelp();
-            }
-        };
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index 069454d..de03737 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -45,7 +45,6 @@ public class StressSettings implements Serializable
     public final SettingsPopulation generate;
     public final SettingsInsert insert;
     public final SettingsColumn columns;
-    public final SettingsSamples samples;
     public final SettingsErrors errors;
     public final SettingsLog log;
     public final SettingsMode mode;
@@ -62,7 +61,6 @@ public class StressSettings implements Serializable
                           SettingsPopulation generate,
                           SettingsInsert insert,
                           SettingsColumn columns,
-                          SettingsSamples samples,
                           SettingsErrors errors,
                           SettingsLog log,
                           SettingsMode mode,
@@ -79,7 +77,6 @@ public class StressSettings implements Serializable
         this.insert = insert;
         this.generate = generate;
         this.columns = columns;
-        this.samples = samples;
         this.errors = errors;
         this.log = log;
         this.mode = mode;
@@ -273,7 +270,6 @@ public class StressSettings implements Serializable
         SettingsTokenRange tokenRange = SettingsTokenRange.get(clArgs);
         SettingsInsert insert = SettingsInsert.get(clArgs);
         SettingsColumn columns = SettingsColumn.get(clArgs);
-        SettingsSamples samples = SettingsSamples.get(clArgs);
         SettingsErrors errors = SettingsErrors.get(clArgs);
         SettingsLog log = SettingsLog.get(clArgs);
         SettingsMode mode = SettingsMode.get(clArgs);
@@ -298,7 +294,7 @@ public class StressSettings implements Serializable
             System.exit(1);
         }
 
-        return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, graph, tokenRange);
+        return new StressSettings(command, rate, generate, insert, columns, errors, log, mode, node, schema, transport, port, sendToDaemon, graph, tokenRange);
     }
 
     private static Map<String, String[]> parseMap(String[] args)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java b/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
deleted file mode 100644
index ed54ee0..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.cassandra.stress.util;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-// represents a sample of long (latencies) together with the probability of selection of each sample (i.e. the ratio of
-// samples to total number of events). This is used to ensure that, when merging, the result has samples from each
-// with equal probability
-public final class SampleOfLongs
-{
-
-    // nanos
-    final long[] sample;
-
-    // probability with which each sample was selected
-    final double p;
-
-    SampleOfLongs(long[] sample, int p)
-    {
-        this.sample = sample;
-        this.p = 1 / (float) p;
-    }
-
-    SampleOfLongs(long[] sample, double p)
-    {
-        this.sample = sample;
-        this.p = p;
-    }
-
-    static SampleOfLongs merge(Random rnd, List<SampleOfLongs> merge, int maxSamples)
-    {
-        // grab the lowest probability of selection, and normalise all samples to that
-        double targetp = 1;
-        for (SampleOfLongs sampleOfLongs : merge)
-            targetp = Math.min(targetp, sampleOfLongs.p);
-
-        // calculate how many samples we should encounter
-        int maxLength = 0;
-        for (SampleOfLongs sampleOfLongs : merge)
-            maxLength += sampleOfLongs.sample.length * (targetp / sampleOfLongs.p);
-
-        if (maxLength > maxSamples)
-        {
-            targetp *= maxSamples / (double) maxLength;
-            maxLength = maxSamples;
-        }
-
-        long[] sample = new long[maxLength];
-        int count = 0;
-        out: for (SampleOfLongs latencies : merge)
-        {
-            long[] in = latencies.sample;
-            double p = targetp / latencies.p;
-            for (int i = 0 ; i < in.length ; i++)
-            {
-                if (rnd.nextDouble() < p)
-                {
-                    sample[count++] = in[i];
-                    if (count == maxLength)
-                        break out;
-                }
-            }
-        }
-        if (count != maxLength)
-            sample = Arrays.copyOf(sample, count);
-        Arrays.sort(sample);
-        return new SampleOfLongs(sample, targetp);
-    }
-
-    public double medianLatency()
-    {
-        if (sample.length == 0)
-            return 0;
-        return sample[sample.length >> 1] * 0.000001d;
-    }
-
-    // 0 < rank < 1
-    public double rankLatency(float rank)
-    {
-        if (sample.length == 0)
-            return 0;
-        int index = (int)(rank * sample.length);
-        if (index >= sample.length)
-            index = sample.length - 1;
-        return sample[index] * 0.000001d;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
index 88e8020..bb19bb6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.util;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,41 +8,40 @@ package org.apache.cassandra.stress.util;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
+
+import org.HdrHistogram.Histogram;
 
 // a timer - this timer must be used by a single thread, and co-ordinates with other timers by
 public final class Timer
 {
-    private ThreadLocalRandom rnd;
+    private Histogram responseTime = new Histogram(3);
+    private Histogram serviceTime = new Histogram(3);
+    private Histogram waitTime = new Histogram(3);
 
-    // in progress snap start
-    private long sampleStartNanos;
+    // event timing info
+    private long intendedTimeNs;
+    private long startTimeNs;
+    private long endTimeNs;
 
-    // each entry is present with probability 1/p(opCount) or 1/(p(opCount)-1)
-    private final long[] sample;
-    private int opCount;
 
     // aggregate info
     private long errorCount;
     private long partitionCount;
     private long rowCount;
-    private long total;
     private long max;
     private long maxStart;
     private long upToDateAsOf;
@@ -52,26 +51,11 @@ public final class Timer
     private volatile CountDownLatch reportRequest;
     volatile TimingInterval report;
     private volatile TimingInterval finalReport;
+    private final boolean isFixed;
 
-    public Timer(int sampleCount)
-    {
-        int powerOf2 = 32 - Integer.numberOfLeadingZeros(sampleCount - 1);
-        this.sample = new long[1 << powerOf2];
-    }
-
-    public void init()
-    {
-        rnd = ThreadLocalRandom.current();
-    }
-
-    public void start(){
-        // decide if we're logging this event
-        sampleStartNanos = System.nanoTime();
-    }
-
-    private int p(int index)
+    public Timer(boolean isFixed)
     {
-        return 1 + (index / sample.length);
+        this.isFixed = isFixed;
     }
 
     public boolean running()
@@ -81,46 +65,52 @@ public final class Timer
 
     public void stop(long partitionCount, long rowCount, boolean error)
     {
+        endTimeNs = System.nanoTime();
         maybeReport();
         long now = System.nanoTime();
-        long time = now - sampleStartNanos;
-        if (rnd.nextInt(p(opCount)) == 0)
-            sample[index(opCount)] = time;
-        if (time > max)
+        if (intendedTimeNs != 0)
+        {
+            long rTime = endTimeNs - intendedTimeNs;
+            responseTime.recordValue(rTime);
+            long wTime = startTimeNs - intendedTimeNs;
+            waitTime.recordValue(wTime);
+        }
+
+        long sTime = endTimeNs - startTimeNs;
+        serviceTime.recordValue(sTime);
+
+        if (sTime > max)
         {
-            maxStart = sampleStartNanos;
-            max = time;
+            maxStart = startTimeNs;
+            max = sTime;
         }
-        total += time;
-        opCount += 1;
         this.partitionCount += partitionCount;
         this.rowCount += rowCount;
         if (error)
             this.errorCount++;
         upToDateAsOf = now;
+        resetTimes();
     }
 
-    private int index(int count)
+    private void resetTimes()
     {
-        return count & (sample.length - 1);
+        intendedTimeNs = startTimeNs = endTimeNs = 0;
     }
 
     private TimingInterval buildReport()
     {
-        final List<SampleOfLongs> sampleLatencies = Arrays.asList
-                (       new SampleOfLongs(Arrays.copyOf(sample, index(opCount)), p(opCount)),
-                        new SampleOfLongs(Arrays.copyOfRange(sample, index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1)
-                );
-        final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount,
-                rowCount, total, opCount, errorCount, SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
+        final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, maxStart, partitionCount,
+                rowCount, errorCount, responseTime, serviceTime, waitTime, isFixed);
         // reset counters
-        opCount = 0;
         partitionCount = 0;
         rowCount = 0;
-        total = 0;
         max = 0;
         errorCount = 0;
         lastSnap = upToDateAsOf;
+        responseTime = new Histogram(3);
+        serviceTime = new Histogram(3);
+        waitTime = new Histogram(3);
+
         return report;
     }
 
@@ -164,4 +154,14 @@ public final class Timer
             reportRequest = null;
         }
     }
+
+    public void intendedTimeNs(long v)
+    {
+        intendedTimeNs = v;
+    }
+
+    public void start()
+    {
+        startTimeNs = System.nanoTime();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index 403bee0..fa95fdb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.util;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.util;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -38,14 +38,12 @@ public class Timing
     // Probably the CopyOnWriteArrayList could be changed to an ordinary list as well.
     private final Map<String, List<Timer>> timers = new TreeMap<>();
     private volatile TimingIntervals history;
-    private final int historySampleCount;
-    private final int reportSampleCount;
     private boolean done;
+    private boolean isFixed;
 
-    public Timing(int historySampleCount, int reportSampleCount)
+    public Timing(boolean isFixed)
     {
-        this.historySampleCount = historySampleCount;
-        this.reportSampleCount = reportSampleCount;
+        this.isFixed = isFixed;
     }
 
     // TIMING
@@ -111,20 +109,20 @@ public class Timing
                 done &= !timer.running();
             }
 
-            intervals.put(entry.getKey(), TimingInterval.merge(operationIntervals, reportSampleCount,
+            intervals.put(entry.getKey(), TimingInterval.merge(operationIntervals,
                                                               history.get(entry.getKey()).endNanos()));
         }
 
         TimingIntervals result = new TimingIntervals(intervals);
         this.done = done;
-        history = history.merge(result, historySampleCount, history.startNanos());
+        history = history.merge(result, history.startNanos());
         return new TimingResult<>(extra, result);
     }
 
     // build a new timer and add it to the set of running timers.
-    public Timer newTimer(String opType, int sampleCount)
+    public Timer newTimer(String opType)
     {
-        final Timer timer = new Timer(sampleCount);
+        final Timer timer = new Timer(isFixed);
 
         if (!timers.containsKey(opType))
             timers.put(opType, new ArrayList<Timer>());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
index ede235c..bb9587f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.util;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,167 +8,193 @@ package org.apache.cassandra.stress.util;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
+import org.HdrHistogram.Histogram;
 
 // represents measurements taken over an interval of time
 // used for both single timer results and merged timer results
 public final class TimingInterval
 {
+    private final Histogram responseTime;
+    private final Histogram serviceTime;
+    private final Histogram waitTime;
+
     public static final long[] EMPTY_SAMPLE = new long[0];
     // nanos
-    private final long start;
-    private final long end;
-    public final long maxLatency;
-    public final long pauseLength;
+    private final long startNs;
+    private final long endNs;
     public final long pauseStart;
-    public final long totalLatency;
 
     // discrete
     public final long partitionCount;
     public final long rowCount;
-    public final long operationCount;
     public final long errorCount;
+    public final boolean isFixed;
 
-    final SampleOfLongs sample;
 
     public String toString()
     {
-        return String.format("Start: %d end: %d maxLatency: %d pauseLength: %d pauseStart: %d totalLatency: %d" +
-                             " pCount: %d rcount: %d opCount: %d errors: %d", start, end, maxLatency, pauseLength,
-                             pauseStart, totalLatency, partitionCount, rowCount, operationCount, errorCount);
+        return String.format("Start: %d end: %d maxLatency: %d pauseStart: %d" +
+                             " pCount: %d rcount: %d opCount: %d errors: %d",
+                             startNs, endNs, getLatencyHistogram().getMaxValue(), pauseStart,
+                             partitionCount, rowCount, getLatencyHistogram().getTotalCount(), errorCount);
     }
 
     TimingInterval(long time)
     {
-        start = end = time;
-        maxLatency = totalLatency = 0;
-        partitionCount = rowCount = operationCount = errorCount = 0;
-        pauseStart = pauseLength = 0;
-        sample = new SampleOfLongs(EMPTY_SAMPLE, 1d);
+        startNs = endNs = time;
+        partitionCount = rowCount = errorCount = 0;
+        pauseStart = 0;
+        responseTime = new Histogram(3);
+        serviceTime = new Histogram(3);
+        waitTime = new Histogram(3);
+        isFixed = false;
     }
 
-    TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount,
-                   long rowCount, long totalLatency, long operationCount, long errorCount, SampleOfLongs sample)
+    TimingInterval(long start, long end, long maxPauseStart, long partitionCount,
+                   long rowCount, long errorCount, Histogram r, Histogram s, Histogram w, boolean isFixed)
     {
-        this.start = start;
-        this.end = Math.max(end, start);
-        this.maxLatency = maxLatency;
+        this.startNs = start;
+        this.endNs = Math.max(end, start);
         this.partitionCount = partitionCount;
         this.rowCount = rowCount;
-        this.totalLatency = totalLatency;
         this.errorCount = errorCount;
-        this.operationCount = operationCount;
-        this.pauseStart = pauseStart;
-        this.pauseLength = pauseLength;
-        this.sample = sample;
+        this.pauseStart = maxPauseStart;
+        this.responseTime = r;
+        this.serviceTime = s;
+        this.waitTime = w;
+        this.isFixed = isFixed;
+
     }
 
     // merge multiple timer intervals together
-    static TimingInterval merge(Iterable<TimingInterval> intervals, int maxSamples, long start)
+    static TimingInterval merge(Iterable<TimingInterval> intervals, long start)
     {
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-        long operationCount = 0, partitionCount = 0, rowCount = 0, errorCount = 0;
-        long maxLatency = 0, totalLatency = 0;
-        List<SampleOfLongs> latencies = new ArrayList<>();
+        long partitionCount = 0, rowCount = 0, errorCount = 0;
         long end = 0;
-        long pauseStart = 0, pauseEnd = Long.MAX_VALUE;
+        long pauseStart = 0;
+        Histogram responseTime = new Histogram(3);
+        Histogram serviceTime = new Histogram(3);
+        Histogram waitTime = new Histogram(3);
+        boolean isFixed = false;
         for (TimingInterval interval : intervals)
         {
             if (interval != null)
             {
-                end = Math.max(end, interval.end);
-                operationCount += interval.operationCount;
-                maxLatency = Math.max(interval.maxLatency, maxLatency);
-                totalLatency += interval.totalLatency;
+                end = Math.max(end, interval.endNs);
                 partitionCount += interval.partitionCount;
                 rowCount += interval.rowCount;
                 errorCount += interval.errorCount;
-                latencies.addAll(Arrays.asList(interval.sample));
-                if (interval.pauseLength > 0)
+
+                if (interval.getLatencyHistogram().getMaxValue() > serviceTime.getMaxValue())
                 {
-                    pauseStart = Math.max(pauseStart, interval.pauseStart);
-                    pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength);
+                    pauseStart = interval.pauseStart;
                 }
+                responseTime.add(interval.responseTime);
+                serviceTime.add(interval.serviceTime);
+                waitTime.add(interval.waitTime);
+                isFixed |= interval.isFixed;
             }
         }
 
-        if (pauseEnd < pauseStart || pauseStart <= 0)
-        {
-            pauseEnd = pauseStart = 0;
-        }
 
-        return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount,
-                                  totalLatency, operationCount, errorCount, SampleOfLongs.merge(rnd, latencies, maxSamples));
+        return new TimingInterval(start, end, pauseStart, partitionCount, rowCount,
+                                  errorCount, responseTime, serviceTime, waitTime, isFixed);
 
     }
 
     public double opRate()
     {
-        return operationCount / ((end - start) * 0.000000001d);
+        return getLatencyHistogram().getTotalCount() / ((endNs - startNs) * 0.000000001d);
     }
 
     public double adjustedRowRate()
     {
-        return rowCount / ((end - (start + pauseLength)) * 0.000000001d);
+        return rowCount / ((endNs - (startNs + getLatencyHistogram().getMaxValue())) * 0.000000001d);
     }
 
     public double partitionRate()
     {
-        return partitionCount / ((end - start) * 0.000000001d);
+        return partitionCount / ((endNs - startNs) * 0.000000001d);
     }
 
     public double rowRate()
     {
-        return rowCount / ((end - start) * 0.000000001d);
+        return rowCount / ((endNs - startNs) * 0.000000001d);
     }
 
-    public double meanLatency()
+    public double meanLatencyMs()
     {
-        return (totalLatency / (double) operationCount) * 0.000001d;
+        return getLatencyHistogram().getMean() * 0.000001d;
     }
 
-    public double maxLatency()
+    public double maxLatencyMs()
     {
-        return maxLatency * 0.000001d;
+        return getLatencyHistogram().getMaxValue() * 0.000001d;
     }
 
-    public double medianLatency()
+    public double medianLatencyMs()
     {
-        return sample.medianLatency();
+        return getLatencyHistogram().getValueAtPercentile(50.0) * 0.000001d;
     }
 
-    // 0 < rank < 1
-    public double rankLatency(float rank)
+
+    /**
+     * @param percentile between 0.0 and 100.0
+     * @return latency in milliseconds at percentile
+     */
+    public double latencyAtPercentileMs(double percentile)
     {
-        return sample.rankLatency(rank);
+        return getLatencyHistogram().getValueAtPercentile(percentile) * 0.000001d;
     }
 
-    public long runTime()
+    public long runTimeMs()
     {
-        return (end - start) / 1000000;
+        return (endNs - startNs) / 1000000;
     }
 
-    public final long endNanos()
+    public long endNanos()
     {
-        return end;
+        return endNs;
     }
 
     public long startNanos()
     {
-        return start;
+        return startNs;
+    }
+
+    public Histogram responseTime()
+    {
+        return responseTime;
+    }
+
+    public Histogram serviceTime()
+    {
+        return serviceTime;
+    }
+
+    public Histogram waitTime()
+    {
+        return waitTime;
+    }
+
+    private Histogram getLatencyHistogram()
+    {
+        if (!isFixed || responseTime.getTotalCount() == 0)
+            return serviceTime;
+        else
+            return responseTime;
     }
 
     public static enum TimingParameter
@@ -182,7 +208,7 @@ public final class TimingInterval
         return getStringValue(value, Float.NaN);
     }
 
-    String getStringValue(TimingParameter value, float rank)
+    String getStringValue(TimingParameter value, double rank)
     {
         switch (value)
         {
@@ -190,14 +216,19 @@ public final class TimingInterval
             case ROWRATE:        return String.format("%,.0f", rowRate());
             case ADJROWRATE:     return String.format("%,.0f", adjustedRowRate());
             case PARTITIONRATE:  return String.format("%,.0f", partitionRate());
-            case MEANLATENCY:    return String.format("%,.1f", meanLatency());
-            case MAXLATENCY:     return String.format("%,.1f", maxLatency());
-            case MEDIANLATENCY:  return String.format("%,.1f", medianLatency());
-            case RANKLATENCY:    return String.format("%,.1f", rankLatency(rank));
+            case MEANLATENCY:    return String.format("%,.1f", meanLatencyMs());
+            case MAXLATENCY:     return String.format("%,.1f", maxLatencyMs());
+            case MEDIANLATENCY:  return String.format("%,.1f", medianLatencyMs());
+            case RANKLATENCY:    return String.format("%,.1f", latencyAtPercentileMs(rank));
             case ERRORCOUNT:     return String.format("%,d", errorCount);
             case PARTITIONCOUNT: return String.format("%,d", partitionCount);
             default:             throw new IllegalStateException();
         }
     }
+
+    public long operationCount()
+    {
+        return getLatencyHistogram().getTotalCount();
+    }
  }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/89f275c6/tools/stress/src/org/apache/cassandra/stress/util/TimingIntervals.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingIntervals.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingIntervals.java
index 17680a7..0586006 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingIntervals.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingIntervals.java
@@ -20,7 +20,7 @@ public class TimingIntervals
         this.intervals = intervals;
     }
 
-    public TimingIntervals merge(TimingIntervals with, int maxSamples, long start)
+    public TimingIntervals merge(TimingIntervals with, long start)
     {
         assert intervals.size() == with.intervals.size();
         TreeMap<String, TimingInterval> ret = new TreeMap<>();
@@ -28,7 +28,7 @@ public class TimingIntervals
         for (String opType : intervals.keySet())
         {
             assert with.intervals.containsKey(opType);
-            ret.put(opType, TimingInterval.merge(Arrays.asList(intervals.get(opType), with.intervals.get(opType)), maxSamples, start));
+            ret.put(opType, TimingInterval.merge(Arrays.asList(intervals.get(opType), with.intervals.get(opType)), start));
         }
 
         return new TimingIntervals(ret);
@@ -39,21 +39,21 @@ public class TimingIntervals
         return intervals.get(opType);
     }
 
-    public TimingInterval combine(int maxSamples)
+    public TimingInterval combine()
     {
         long start = Long.MAX_VALUE;
         for (TimingInterval ti : intervals.values())
             start = Math.min(start, ti.startNanos());
 
-        return TimingInterval.merge(intervals.values(), maxSamples, start);
+        return TimingInterval.merge(intervals.values(), start);
     }
 
     public String str(TimingInterval.TimingParameter value, String unit)
     {
-        return str(value, Float.NaN, unit);
+        return str(value, Double.NaN, unit);
     }
 
-    public String str(TimingInterval.TimingParameter value, float rank, String unit)
+    public String str(TimingInterval.TimingParameter value, double rank, String unit)
     {
         StringBuilder sb = new StringBuilder("[");
 
@@ -106,7 +106,7 @@ public class TimingIntervals
         return str(TimingInterval.TimingParameter.MEDIANLATENCY, "ms");
     }
 
-    public String rankLatencies(float rank)
+    public String latenciesAtPercentile(double rank)
     {
         return str(TimingInterval.TimingParameter.RANKLATENCY, rank, "ms");
     }


Mime
View raw message