cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/3] git commit: ninja-fix stress: rate limit not honoured for warmup, and 'auto' rate mode optional with thread ranges
Date Sat, 20 Sep 2014 07:44:58 GMT
ninja-fix stress: rate limit not honoured for warmup, and 'auto' rate mode optional with thread
ranges


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eecc034b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eecc034b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eecc034b

Branch: refs/heads/trunk
Commit: eecc034b686b4f2998fbc8045cbc4c7a1e4f0902
Parents: 6e5dd23
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Sat Sep 20 08:44:37 2014 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Sat Sep 20 08:44:37 2014 +0100

----------------------------------------------------------------------
 .../apache/cassandra/stress/StressAction.java   | 34 +++++++++++---------
 .../cassandra/stress/settings/SettingsRate.java | 24 +++++++-------
 2 files changed, 31 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eecc034b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index da32284..b50637f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -56,19 +56,24 @@ public class StressAction implements Runnable
         // creating keyspace and column families
         settings.maybeCreateKeyspaces();
 
-        // TODO: warmup should
+        // TODO: warmup should operate configurably over op/pk/row, and be of configurable
length
         if (!settings.command.noWarmup)
             warmup(settings.command.getFactory(settings));
 
         output.println("Sleeping 2s...");
         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
+        // TODO : move this to a new queue wrapper that gates progress based on a poisson
(or configurable) distribution
+        RateLimiter rateLimiter = null;
+        if (settings.rate.opRateTargetPerSecond > 0)
+            rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
+
         boolean success;
-        if (settings.rate.auto)
-            success = runAuto();
+        if (settings.rate.minThreads > 0)
+            success = runMulti(settings.rate.auto, rateLimiter);
         else
             success = null != run(settings.command.getFactory(settings), settings.rate.threadCount,
settings.command.count,
-                                  settings.command.duration, settings.command.durationUnits,
output);
+                                  settings.command.duration, rateLimiter, settings.command.durationUnits,
output);
 
         if (success)
             output.println("END");
@@ -89,16 +94,18 @@ public class StressAction implements Runnable
             // we need to warm up all the nodes in the cluster ideally, but we may not be
the only stress instance;
             // so warm up all the nodes we're speaking to only.
             output.println(String.format("Warming up %s with %d iterations...", single.desc(),
iterations));
-            run(single, 20, iterations, 0, null, warmupOutput);
+            run(single, 20, iterations, 0, null, null, warmupOutput);
         }
     }
 
     // TODO : permit varying more than just thread count
     // TODO : vary thread count based on percentage improvement of previous increment, not
by fixed amounts
-    private boolean runAuto()
+    private boolean runMulti(boolean auto, RateLimiter rateLimiter)
     {
+        if (settings.command.targetUncertainty >= 0)
+            output.println("WARNING: uncertainty mode (err<) results in uneven workload
between thread runs, so should be used for high level analysis only");
         int prevThreadCount = -1;
-        int threadCount = settings.rate.minAutoThreads;
+        int threadCount = settings.rate.minThreads;
         List<StressMetrics> results = new ArrayList<>();
         List<String> runIds = new ArrayList<>();
         do
@@ -106,7 +113,7 @@ public class StressAction implements Runnable
             output.println(String.format("Running with %d threadCount", threadCount));
 
             StressMetrics result = run(settings.command.getFactory(settings), threadCount,
settings.command.count,
-                                       settings.command.duration, settings.command.durationUnits,
output);
+                                       settings.command.duration, rateLimiter, settings.command.durationUnits,
output);
             if (result == null)
                 return false;
             results.add(result);
@@ -122,7 +129,7 @@ public class StressAction implements Runnable
             else
                 threadCount *= 1.5;
 
-            if (!results.isEmpty() && threadCount > settings.rate.maxAutoThreads)
+            if (!results.isEmpty() && threadCount > settings.rate.maxThreads)
                 break;
 
             if (settings.command.type.updates)
@@ -139,7 +146,7 @@ public class StressAction implements Runnable
                 }
             }
             // run until we have not improved throughput significantly for previous three
runs
-        } while (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results,
5, settings.command.targetUncertainty));
+        } while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results,
5, settings.command.targetUncertainty)));
 
         // summarise all results
         StressMetrics.summarise(runIds, results, output);
@@ -163,7 +170,7 @@ public class StressAction implements Runnable
         return improvement / count;
     }
 
-    private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount,
long duration, TimeUnit durationUnits, PrintStream output)
+    private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount,
long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output)
     {
         output.println(String.format("Running %s with %d threads %s",
                                      operations.desc(),
@@ -177,11 +184,6 @@ public class StressAction implements Runnable
         else
             workManager = new FixedWorkManager(opCount);
 
-        RateLimiter rateLimiter = null;
-        // TODO : move this to a new queue wrapper that gates progress based on a poisson
(or configurable) distribution
-        if (settings.rate.opRateTargetPerSecond > 0)
-            rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
-
         final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis,
settings);
 
         final CountDownLatch done = new CountDownLatch(threadCount);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eecc034b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
index a91f073..0486678 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
@@ -30,8 +30,8 @@ public class SettingsRate implements Serializable
 {
 
     public final boolean auto;
-    public final int minAutoThreads;
-    public final int maxAutoThreads;
+    public final int minThreads;
+    public final int maxThreads;
     public final int threadCount;
     public final int opRateTargetPerSecond;
 
@@ -41,15 +41,15 @@ public class SettingsRate implements Serializable
         threadCount = Integer.parseInt(options.threads.value());
         String rateOpt = options.rate.value();
         opRateTargetPerSecond = Integer.parseInt(rateOpt.substring(0, rateOpt.length() -
2));
-        minAutoThreads = -1;
-        maxAutoThreads = -1;
+        minThreads = -1;
+        maxThreads = -1;
     }
 
     public SettingsRate(AutoOptions auto)
     {
-        this.auto = true;
-        this.minAutoThreads = Integer.parseInt(auto.minThreads.value());
-        this.maxAutoThreads = Integer.parseInt(auto.maxThreads.value());
+        this.auto = auto.auto.setByUser();
+        this.minThreads = Integer.parseInt(auto.minThreads.value());
+        this.maxThreads = Integer.parseInt(auto.maxThreads.value());
         this.threadCount = -1;
         this.opRateTargetPerSecond = 0;
     }
@@ -59,14 +59,14 @@ public class SettingsRate implements Serializable
 
     private static final class AutoOptions extends GroupedOptions
     {
-        final OptionSimple auto = new OptionSimple("auto", "", null, "test with increasing
number of threadCount until performance plateaus", false);
+        final OptionSimple auto = new OptionSimple("auto", "", null, "stop increasing threads
once throughput saturates", false);
         final OptionSimple minThreads = new OptionSimple("threads>=", "[0-9]+", "4", "run
at least this many clients concurrently", false);
         final OptionSimple maxThreads = new OptionSimple("threads<=", "[0-9]+", "1000",
"run at most this many clients concurrently", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(auto, minThreads, maxThreads);
+            return Arrays.asList(minThreads, maxThreads, auto);
         }
     }
 
@@ -96,11 +96,13 @@ public class SettingsRate implements Serializable
                     if (command.count > 0)
                     {
                         ThreadOptions options = new ThreadOptions();
-                        options.accept("threads=50");
+                        options.accept("threads=200");
                         return new SettingsRate(options);
                     }
             }
-            return new SettingsRate(new AutoOptions());
+            AutoOptions options = new AutoOptions();
+            options.accept("auto");
+            return new SettingsRate(options);
         }
         GroupedOptions options = GroupedOptions.select(params, new AutoOptions(), new ThreadOptions());
         if (options == null)


Mime
View raw message