accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo-testing] branch master updated: Improve yield+scan exec performance test (#57)
Date Fri, 22 Feb 2019 18:43:09 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new 45edc84  Improve yield+scan exec performance test (#57)
45edc84 is described below

commit 45edc8434e3a5d970c613a16136d258d03ee43e8
Author: Keith Turner <kturner@apache.org>
AuthorDate: Fri Feb 22 13:43:05 2019 -0500

    Improve yield+scan exec performance test (#57)
    
    Before this commit the filters running in the background had a very low
    chance of returning data.  This did not execercise the use case of an
    iterator returning data very slowly.  Returning data very slowly results
    in slowly filling a buffer.  The test was modified to run the filter
    scans with multiple probabilites which causes data to return with
    different velocities.  Also the yield filter was modfied to support
    yielding across returning a key/value.
    
    The design of the yield filter was improved so that subclasses do not
    have to override init().
---
 .../performance/tests/ProbabilityFilter.java       | 16 ++---------
 .../testing/performance/tests/YieldingFilter.java  |  9 ++++---
 .../performance/tests/YieldingScanExecutorPT.java  | 31 +++++++++-------------
 3 files changed, 20 insertions(+), 36 deletions(-)

diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
index aa807e0..5c37201 100644
--- a/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
@@ -1,28 +1,16 @@
 package org.apache.accumulo.testing.performance.tests;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Random;
 import java.util.function.BiPredicate;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
 public class ProbabilityFilter extends YieldingFilter {
-
-  private double matchProbability;
-
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options,
-      IteratorEnvironment env) throws IOException {
-    super.init(source, options, env);
-    this.matchProbability = Double.parseDouble(options.get("probability"));
-  }
-
   @Override
-  protected BiPredicate<Key, Value> createPredicate() {
+  protected BiPredicate<Key, Value> createPredicate(Map<String,String> options)
{
+    double matchProbability = Double.parseDouble(options.get("probability"));
     Random rand = new Random();
     return (k,v) -> rand.nextDouble() < matchProbability;
   }
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
index f514059..2e920cc 100644
--- a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
@@ -12,7 +12,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.YieldCallback;
-import org.slf4j.LoggerFactory;
 
 public abstract class YieldingFilter implements SortedKeyValueIterator<Key,Value> {
 
@@ -20,24 +19,26 @@ public abstract class YieldingFilter implements SortedKeyValueIterator<Key,Value
   private BiPredicate<Key,Value> predicate;
   private YieldCallback<Key> yield;
   private long yieldTime;
+  private long start;
 
-  protected abstract BiPredicate<Key,Value> createPredicate();
+  protected abstract BiPredicate<Key,Value> createPredicate(Map<String,String>
options);
 
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options,
       IteratorEnvironment env) throws IOException {
     this.source = source;
-    this.predicate = createPredicate();
+    this.predicate = createPredicate(options);
     this.yieldTime = Long.parseLong(options.getOrDefault("yieldTimeMS", "100"));
+    start = System.nanoTime();
   }
 
   protected void findTop() throws IOException {
-    long start = System.nanoTime();
     while (source.hasTop() && !source.getTopKey().isDeleted()
         && !predicate.test(source.getTopKey(), source.getTopValue())) {
       long duration = (System.nanoTime() - start) / 1000000;
       if (duration > yieldTime) {
         yield.yield(source.getTopKey());
+        start = System.nanoTime();
         break;
       }
 
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
index dd94d37..5d50007 100644
--- a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
@@ -18,6 +18,7 @@
 package org.apache.accumulo.testing.performance.tests;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -43,8 +44,8 @@ import org.apache.accumulo.testing.performance.util.TestData;
 import org.apache.accumulo.testing.performance.util.TestExecutor;
 import org.apache.hadoop.io.Text;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 public class YieldingScanExecutorPT implements PerformanceTest {
 
@@ -66,7 +67,7 @@ public class YieldingScanExecutorPT implements PerformanceTest {
       + "are working correctly then the short scans should have very short response times.
 This "
       + "happens because the filters should end up in a separate thread pool than the short
scan.";
 
-  private static final String FILTER_PROBABILITY = "0.000001";
+  private static final String FILTER_PROBABILITIES = "0.01,0.001,0.0001,0.00001,0.000001";
   private static final String FILTER_YIELD_TIME = "1000";
 
   private static final String QUICK_SCAN_TIME = "500";
@@ -141,8 +142,9 @@ public class YieldingScanExecutorPT implements PerformanceTest {
     builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
         "Server side scan handler threads that each executor has.  There are 2 executors.");
 
-    builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance that one of the
long "
-        + "filter scans will return any key it sees.");
+    builder.parameter("filter_probabilities", FILTER_PROBABILITIES, "The chances that one
of the long "
+        + "filter scans will return any key it sees. The probabilites are cycled through
when "
+        + "starting long scans.");
     builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms after which
one of "
         + "the long filter scans will yield.");
     builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time in ms for deciding
"
@@ -152,12 +154,11 @@ public class YieldingScanExecutorPT implements PerformanceTest {
     return builder.build();
   }
 
-  private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam,
-      Map<String,String> hints) throws TableNotFoundException {
+  private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam)
+      throws TableNotFoundException {
     long t1 = System.currentTimeMillis();
     int count = 0;
     try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
-      // scanner.setExecutionHints(hints);
       scanner.setRange(Range.exact(new Text(row), new Text(fam)));
       if (Iterables.size(scanner) != NUM_QUALS) {
         throw new RuntimeException("bad count " + count);
@@ -167,14 +168,14 @@ public class YieldingScanExecutorPT implements PerformanceTest {
     return System.currentTimeMillis() - t1;
   }
 
-  private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, Map<String,String>
hints)
+  private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, String filterProbability)
       throws TableNotFoundException {
     long count = 0;
     while (!stop.get()) {
       try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
 
         IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class);
-        is.addOption("probability", FILTER_PROBABILITY);
+        is.addOption("probability", filterProbability);
         is.addOption("yieldTimeMS", FILTER_YIELD_TIME);
 
         scanner.addScanIterator(is);
@@ -195,19 +196,13 @@ public class YieldingScanExecutorPT implements PerformanceTest {
   private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
       throws InterruptedException, ExecutionException {
 
-    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
-    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
-
     try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS))
{
       Random rand = new Random();
 
       for (int i = 0; i < numScans; i++) {
         byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
         byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
-        // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting
high
-        // priority
-        Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints;
-        executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints));
+        executor.submit(() -> scan(tableName, env.getClient(), row, fam));
       }
 
       return executor.stream().mapToLong(l -> l).summaryStatistics();
@@ -215,12 +210,12 @@ public class YieldingScanExecutorPT implements PerformanceTest {
   }
 
   private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean
stop) {
-    Map<String,String> hints = ImmutableMap.of("priority", "2");
 
+    Iterator<String> fpi = Iterators.cycle(FILTER_PROBABILITIES.split(","));
     TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
 
     for (int i = 0; i < NUM_LONG_SCANS; i++) {
-      longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
+      longScans.submit(() -> scan(tableName, env.getClient(), stop, fpi.next()));
     }
     return longScans;
   }


Mime
View raw message