accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo-testing] branch master updated: Use hints (apache/accumulo#555) in scan executor test (#31)
Date Thu, 02 Aug 2018 17:37:36 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aff9ce  Use hints (apache/accumulo#555) in scan executor test (#31)
6aff9ce is described below

commit 6aff9ce2295d6658775f93539967b785c155d604
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Thu Aug 2 13:37:35 2018 -0400

    Use hints (apache/accumulo#555) in scan executor test (#31)
---
 .../core/performance/tests/ScanExecutorPT.java     | 63 ++++++++++++++++------
 1 file changed, 46 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/ScanExecutorPT.java
b/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/ScanExecutorPT.java
index ecfa182..3dbd5da 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/ScanExecutorPT.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/ScanExecutorPT.java
@@ -34,7 +34,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.spi.scan.IdleRatioScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.HintScanPrioritizer;
 import org.apache.accumulo.testing.core.performance.Environment;
 import org.apache.accumulo.testing.core.performance.PerformanceTest;
 import org.apache.accumulo.testing.core.performance.Report;
@@ -43,6 +43,7 @@ import org.apache.accumulo.testing.core.performance.util.TestData;
 import org.apache.accumulo.testing.core.performance.util.TestExecutor;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class ScanExecutorPT implements PerformanceTest {
@@ -55,10 +56,13 @@ public class ScanExecutorPT implements PerformanceTest {
   private static final int NUM_QUALS = 10;
 
   private static final String SCAN_EXECUTOR_THREADS = "2";
-  private static final String SCAN_PRIORITIZER = IdleRatioScanPrioritizer.class.getName();
+  private static final String SCAN_PRIORITIZER = HintScanPrioritizer.class.getName();
 
-  private static final String TEST_DESC = "Scan Executor Test.  Test running lots of short
scans while long scans are running in the background.  Each short scan reads a random row
and family. A scan prioritizer that favors short scans is configured.  If the scan prioritizer
is not working properly, then the short "
-      + "scans will be orders of magnitude slower.";
+  private static final String TEST_DESC = "Scan Executor Test.  Test running lots of short
scans "
+      + "while long scans are running in the background.  Each short scan reads a random
row and "
+      + "family. Using execution hints, short scans are randomly either given a high priority
or "
+      + "a dedicated executor.  If the scan prioritizer or dispatcher is not working properly,
"
+      + "then the short scans will be orders of magnitude slower.";
 
   @Override
   public SystemConfiguration getConfiguration() {
@@ -66,8 +70,14 @@ public class ScanExecutorPT implements PerformanceTest {
 
     siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
     siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads", SCAN_EXECUTOR_THREADS);
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.prioritizer", SCAN_PRIORITIZER);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
+        SCAN_EXECUTOR_THREADS);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.prioritizer",
+        SCAN_PRIORITIZER);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
+        SCAN_EXECUTOR_THREADS);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.prioritizer",
+        SCAN_PRIORITIZER);
 
     return new SystemConfiguration().setAccumuloConfig(siteCfg);
   }
@@ -79,8 +89,10 @@ public class ScanExecutorPT implements PerformanceTest {
 
     Map<String,String> props = new HashMap<>();
     props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "executor", "se1");
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "heed_hints", "true");
 
-    env.getConnector().tableOperations().create(tableName, new NewTableConfiguration().setProperties(props));
+    env.getConnector().tableOperations().create(tableName,
+        new NewTableConfiguration().setProperties(props));
 
     long t1 = System.currentTimeMillis();
     TestData.generate(env.getConnector(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS);
@@ -109,24 +121,30 @@ public class ScanExecutorPT implements PerformanceTest {
     builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact rate entries/sec
");
     builder.info("short_times1", shortStats1, "Times in ms for each short scan.  First run.");
     builder.info("short_times2", shortStats2, "Times in ms for each short scan. Second run.");
-    builder.result("short", shortStats2.getAverage(), "Average times in ms for short scans
from 2nd run.");
+    builder.result("short", shortStats2.getAverage(),
+        "Average times in ms for short scans from 2nd run.");
     builder.info("long_counts", longStats, "Entries read by each long scan threads");
-    builder.info("long", longStats.getSum(), (t4-t3), "Combined rate in entries/second of
all long scans");
+    builder.info("long", longStats.getSum(), (t4 - t3),
+        "Combined rate in entries/second of all long scans");
     builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short
scans.");
-    builder.parameter("long_threads", NUM_LONG_SCANS, "Threads running long scans.  Each
thread repeatedly scans entire table for duration of test.");
+    builder.parameter("long_threads", NUM_LONG_SCANS,
+        "Threads running long scans.  Each thread repeatedly scans entire table for duration
of test.");
     builder.parameter("rows", NUM_ROWS, "Rows in test table");
     builder.parameter("familes", NUM_FAMS, "Families per row in test table");
     builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table");
-    builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS, "Server side scan handler
threads");
+    builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
+        "Server side scan handler threads");
     builder.parameter("prioritizer", SCAN_PRIORITIZER, "Server side scan prioritizer");
 
     return builder.build();
   }
 
-  private static long scan(String tableName, Connector c, byte[] row, byte[] fam) throws
TableNotFoundException {
+  private static long scan(String tableName, Connector c, byte[] row, byte[] fam,
+      Map<String,String> hints) throws TableNotFoundException {
     long t1 = System.currentTimeMillis();
     int count = 0;
     try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+      scanner.setExecutionHints(hints);
       scanner.setRange(Range.exact(new Text(row), new Text(fam)));
       if (Iterables.size(scanner) != NUM_QUALS) {
         throw new RuntimeException("bad count " + count);
@@ -136,10 +154,12 @@ public class ScanExecutorPT implements PerformanceTest {
     return System.currentTimeMillis() - t1;
   }
 
-  private long scan(String tableName, Connector c, AtomicBoolean stop) throws TableNotFoundException
{
+  private long scan(String tableName, Connector c, AtomicBoolean stop, Map<String,String>
hints)
+      throws TableNotFoundException {
     long count = 0;
     while (!stop.get()) {
       try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setExecutionHints(hints);
         for (Entry<Key,Value> entry : scanner) {
           count++;
           if (stop.get()) {
@@ -151,15 +171,22 @@ public class ScanExecutorPT implements PerformanceTest {
     return count;
   }
 
-  private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
throws InterruptedException, ExecutionException {
+  private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
+      throws InterruptedException, ExecutionException {
 
-    try(TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS))
{
+    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
+    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
+
+    try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS))
{
       Random rand = new Random();
 
       for (int i = 0; i < numScans; i++) {
         byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
         byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
-        executor.submit(() -> scan(tableName, env.getConnector(), row, fam));
+        // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting
high
+        // priority
+        Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints;
+        executor.submit(() -> scan(tableName, env.getConnector(), row, fam, hints));
       }
 
       return executor.stream().mapToLong(l -> l).summaryStatistics();
@@ -167,10 +194,12 @@ public class ScanExecutorPT implements PerformanceTest {
   }
 
   private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean
stop) {
+    Map<String,String> hints = ImmutableMap.of("priority", "2");
+
     TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
 
     for (int i = 0; i < NUM_LONG_SCANS; i++) {
-      longScans.submit(() -> scan(tableName, env.getConnector(), stop));
+      longScans.submit(() -> scan(tableName, env.getConnector(), stop, hints));
     }
     return longScans;
   }


Mime
View raw message