accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: Support dispatching and prioritizing by user scan types (#972)
Date Thu, 21 Feb 2019 17:02:32 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8df9a  Support dispatching and prioritizing by user scan types (#972)
4a8df9a is described below

commit 4a8df9af165a211626e1fc8ad836251f11390153
Author: Keith Turner <kturner@apache.org>
AuthorDate: Thu Feb 21 12:02:27 2019 -0500

    Support dispatching and prioritizing by user scan types (#972)
    
    While working on apache/fluo#1055 I found it cumbersome to configure
    Fluo and Accumulo to dispatch Fluo notification scans to a dedicated
    executor.  With these changes its much simpler to do that.  Fluo can
    set an execution hint  like `scan_type=fluo-ntfy` and Accumulo can be
    configured to execute the scan based on the type without changing Fluo
    config or source code.
    
    Before these changes, Fluo would have required either a custom
    dispatcher or custom Fluo config for Accumulo executors. With these
    changes nothing needs to be done in Fluo.
---
 .../apache/accumulo/core/client/ScannerBase.java   |  5 ++
 .../core/spi/scan/HintScanPrioritizer.java         | 45 ++++++++--
 .../core/spi/scan/SimpleScanDispatcher.java        | 96 +++++++---------------
 .../core/spi/scan/HintScanPrioritizerTest.java     | 82 ++++++++++++++++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    | 29 ++-----
 .../accumulo/core/spi/scan/TestScanInfo.java       |  7 ++
 6 files changed, 168 insertions(+), 96 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 328c066..59acc0c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -341,6 +341,11 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>>,
AutoCloseable {
    * scan, only how quickly it is returned.
    *
    * <p>
+   * Using the hint {@code scan_type=<type>} and documenting all of the types for your
application
+   * is one strategy to consider. This allows administrators to adjust executor and prioritizer
+   * config for your application scan types without having to change the application source
code.
+   *
+   * <p>
    * The default configuration for Accumulo will ignore hints. See {@link HintScanPrioritizer}
and
    * {@link SimpleScanDispatcher} for examples of classes that can react to hints.
    *
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
index 22063ef..f713861 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
@@ -24,14 +24,18 @@ import org.apache.accumulo.core.client.ScannerBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
 /**
  * When configured for a scan executor, this prioritizer allows scanners to set priorities
as
- * integers.
+ * integers. Lower integers result in higher priority.
  *
  * <p>
- * Scanners should put the key/value {@code priority=<integer>} in the map passed to
- * {@link ScannerBase#setExecutionHints(Map)} to set the priority. Lower integers result
in higher
- * priority.
+ * Scanners can put the key/values {@code priority=<integer>} and/or {@code scan_type=<type>}
in the
+ * map passed to {@link ScannerBase#setExecutionHints(Map)} to set the priority. When a
+ * {@code priority} hint is set it takes precedence and the value is used as the priority.
When a
+ * {@code scan_type} hint is set the priority is looked up using the value.
  *
  * <p>
  * This prioritizer accepts the option {@code default_priority=<integer>} which determines
what
@@ -45,6 +49,10 @@ import org.slf4j.LoggerFactory;
  * option silently ignores invalid hints.
  *
  * <p>
+ * This prioritizer accepts the option {@code priority.<type>=<integer>} which
maps a scan type hint
+ * to a priority.
+ *
+ * <p>
  * When two scans have the same priority, the scan is prioritized based on last run time
and then
  * creation time.
  *
@@ -61,11 +69,14 @@ public class HintScanPrioritizer implements ScanPrioritizer {
 
   private static final Logger log = LoggerFactory.getLogger(HintScanPrioritizer.class);
 
+  private final String PRIO_PREFIX = "priority.";
+
   private enum HintProblemAction {
     NONE, LOG, FAIL
   }
 
-  private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa)
{
+  private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa,
+      Map<String,Integer> typePriorities) {
     String prio = si.getExecutionHints().get("priority");
     if (prio != null) {
       try {
@@ -86,6 +97,16 @@ public class HintScanPrioritizer implements ScanPrioritizer {
       }
     }
 
+    if (!typePriorities.isEmpty()) {
+      String scanType = si.getExecutionHints().get("scan_type");
+      if (scanType != null) {
+        Integer typePrio = typePriorities.get(scanType);
+        if (typePrio != null) {
+          return typePrio;
+        }
+      }
+    }
+
     return defaultPriority;
   }
 
@@ -94,10 +115,22 @@ public class HintScanPrioritizer implements ScanPrioritizer {
     int defaultPriority = Integer
         .parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE
+ ""));
 
+    Builder<String,Integer> tpb = ImmutableMap.builder();
+
+    params.getOptions().forEach((k, v) -> {
+      if (k.startsWith(PRIO_PREFIX)) {
+        String type = k.substring(PRIO_PREFIX.length());
+        tpb.put(type, Integer.parseInt(v));
+      }
+    });
+
+    ImmutableMap<String,Integer> typePriorities = tpb.build();
+
     HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions()
         .getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
 
-    Comparator<ScanInfo> cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority,
hpa));
+    Comparator<ScanInfo> cmp = Comparator
+        .comparingInt(si -> getPriority(si, defaultPriority, hpa, typePriorities));
 
     return cmp.thenComparingLong(si -> si.getLastRunTime().orElse(0))
         .thenComparingLong(ScanInfo::getCreationTime);
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index 9653f4f..4cba357 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -20,12 +20,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.ScannerBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 
 /**
  * If no options are given, then this will dispatch to an executor named {@code default}.
This
@@ -38,21 +36,9 @@ import com.google.common.collect.Sets;
  * scans to the named executor.</LI>
  * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>}
: dispatches regular
  * scans to the named executor.</LI>
- * <LI>{@code table.scan.dispatcher.opts.heed_hints=true|false} : This option defaults
to false, so
- * by default execution hints are ignored. When set to true, the executor can be set on the
scanner.
- * This is done by putting the key/value {@code executor=<scan executor name>} in the
map passed to
- * {@link ScannerBase#setExecutionHints(Map)}
- * <LI>{@code table.scan.dispatcher.opts.bad_hint_action=none|log|fail} : When
- * {@code heed_hints=true}, this option determines what to do if the executor in a hint does
not
- * exist. The possible values for this option are {@code none}, {@code log}, or {@code error}.
- * Setting {@code none} will silently ignore invalid hints. Setting {@code log} will log
a warning
- * for invalid hints. Setting {@code fail} will throw an exception likely causing the scan
to fail.
- * For {@code log} and {@code none}, when there is an invalid hint it will fall back to the
table
- * configuration. The default is {@code log}.
- * <LI>{@code table.scan.dispatcher.opts.ignored_hint_action=none|log|fail} : When
- * {@code heed_hints=false}, this option determines what to do if a hint specifies an executor.
The
- * possible values for this option are {@code none}, {@code log}, or {@code fail}. The default
is
- * {@code log}.
+ * <LI>{@code table.scan.dispatcher.opts.executor.<type>=<scan executor name>}
: dispatches scans
+ * that set the hint {@code scan_type=<type>} to the named executor. If this setting
matches then it
+ * takes precedence over all other settings. See {@link ScannerBase#setExecutionHints(Map)}</LI>
  *
  * </UL>
  *
@@ -62,74 +48,52 @@ import com.google.common.collect.Sets;
 
 public class SimpleScanDispatcher implements ScanDispatcher {
 
+  private final String EXECUTOR_PREFIX = "executor.";
+
   private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor",
-      "single_executor", "heed_hints", "bad_hint_action", "ignored_hint_action");
+      "single_executor");
   private String multiExecutor;
   private String singleExecutor;
-  private boolean heedHints;
-  private HintProblemAction badHintAction;
-  private HintProblemAction ignoredHintHaction;
-
-  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
 
-  private static final Logger log = LoggerFactory.getLogger(SimpleScanDispatcher.class);
+  private Map<String,String> typeExecutors;
 
-  private enum HintProblemAction {
-    NONE, LOG, FAIL
-  }
+  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
 
   @Override
   public void init(InitParameters params) {
     Map<String,String> options = params.getOptions();
-    Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
-    Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
+
+    Builder<String,String> teb = ImmutableMap.builder();
+
+    options.forEach((k, v) -> {
+      if (k.startsWith(EXECUTOR_PREFIX)) {
+        String type = k.substring(EXECUTOR_PREFIX.length());
+        teb.put(type, v);
+      } else if (!VALID_OPTS.contains(k)) {
+        throw new IllegalArgumentException("Invalid option " + k);
+      }
+    });
+
+    typeExecutors = teb.build();
 
     String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
     multiExecutor = options.getOrDefault("multi_executor", base);
     singleExecutor = options.getOrDefault("single_executor", base);
-    heedHints = Boolean.parseBoolean(options.getOrDefault("heed_hints", "false"));
-    badHintAction = HintProblemAction.valueOf(
-        options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
-    ignoredHintHaction = HintProblemAction.valueOf(
-        options.getOrDefault("ignored_hint_action", HintProblemAction.LOG.name()).toUpperCase());
+
   }
 
   @Override
   public String dispatch(DispatchParmaters params) {
     ScanInfo scanInfo = params.getScanInfo();
-    if (heedHints) {
-      String executor = scanInfo.getExecutionHints().get("executor");
-      if (executor != null) {
-        if (params.getScanExecutors().containsKey(executor)) {
+
+    if (!typeExecutors.isEmpty()) {
+      String scanType = scanInfo.getExecutionHints().get("scan_type");
+      if (scanType != null) {
+        String executor = typeExecutors.get(scanType);
+        if (executor != null) {
           return executor;
-        } else {
-          switch (badHintAction) {
-            case FAIL:
-              throw new IllegalArgumentException(
-                  "Scan execution hint contained unknown executor " + executor);
-            case LOG:
-              log.warn("Scan execution hint contained unknown executor {} ", executor);
-              break;
-            case NONE:
-              break;
-            default:
-              throw new IllegalStateException();
-          }
         }
       }
-    } else if (ignoredHintHaction != HintProblemAction.NONE
-        && scanInfo.getExecutionHints().containsKey("executor")) {
-      String executor = scanInfo.getExecutionHints().get("executor");
-      switch (ignoredHintHaction) {
-        case FAIL:
-          throw new IllegalArgumentException(
-              "Scan execution hint contained executor " + executor + " when heed_hints=false");
-        case LOG:
-          log.warn("Scan execution hint contained executor {} when heed_hints=false", executor);
-          break;
-        default:
-          throw new IllegalStateException();
-      }
     }
 
     switch (scanInfo.getScanType()) {
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
new file mode 100644
index 0000000..0faa2f0
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class HintScanPrioritizerTest {
+  @Test
+  public void testSort() {
+    long now = System.currentTimeMillis();
+
+    List<TestScanInfo> scans = new ArrayList<>();
+
+    // Two following have never run, so oldest should go first
+    scans.add(new TestScanInfo("a", Type.SINGLE, now - 7));
+    scans.add(
+        new TestScanInfo("b", Type.SINGLE, now - 3).setExecutionHints("scan_type", "background"));
+    scans.add(
+        new TestScanInfo("c", Type.SINGLE, now - 4).setExecutionHints("scan_type", "background"));
+    scans.add(new TestScanInfo("d", Type.SINGLE, now - 3).setExecutionHints("scan_type",
"isbn"));
+    scans.add(new TestScanInfo("e", Type.SINGLE, now - 5).setExecutionHints("scan_type",
"isbn"));
+    scans.add(new TestScanInfo("f", Type.SINGLE, now - 1).setExecutionHints("priority", "35"));
+    scans.add(new TestScanInfo("g", Type.SINGLE, now - 2).setExecutionHints("priority", "25"));
+    scans.add(new TestScanInfo("h", Type.SINGLE, now - 3).setExecutionHints("priority", "15"));
+    scans.add(new TestScanInfo("i", Type.SINGLE, now - 4).setExecutionHints("priority", "5"));
+
+    Collections.shuffle(scans);
+
+    Comparator<ScanInfo> comparator = new HintScanPrioritizer()
+        .createComparator(new ScanPrioritizer.CreateParameters() {
+
+          @Override
+          public Map<String,String> getOptions() {
+            return ImmutableMap.of("priority.isbn", "10", "priority.background", "30",
+                "default_priority", "20");
+          }
+
+          @Override
+          public ServiceEnvironment getServiceEnv() {
+            throw new UnsupportedOperationException();
+          }
+        });
+
+    Collections.sort(scans, comparator);
+
+    assertEquals("i", scans.get(0).testId);
+    assertEquals("e", scans.get(1).testId);
+    assertEquals("d", scans.get(2).testId);
+    assertEquals("h", scans.get(3).testId);
+    assertEquals("a", scans.get(4).testId);
+    assertEquals("g", scans.get(5).testId);
+    assertEquals("c", scans.get(6).testId);
+    assertEquals("b", scans.get(7).testId);
+    assertEquals("f", scans.get(8).testId);
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index a35e527..432e6dd 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -125,29 +125,10 @@ public class SimpleScanDispatcherTest {
 
   @Test
   public void testHints() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), ImmutableMap.of(), "E1",
"E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
-        ImmutableMap.of("executor", "E2"), "E2", "E2");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
-        ImmutableMap.of("executor", "E5"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "ignored_hint_action",
"fail"),
-        ImmutableMap.of("executor", "E5"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail",
-        "ignored_hint_action", "fail"), ImmutableMap.of("executor", "E2"), "E2", "E2");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false"),
-        ImmutableMap.of("executor", "E2"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("executor", "E2"), "E1", "E1");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBadHint() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail"),
-        ImmutableMap.of("executor", "E5"), "E2", "E2");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testIgnoredHint() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false", "ignored_hint_action",
"fail"),
-        ImmutableMap.of("executor", "E2"), "E1", "E1");
+    runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("scan_type", "quick"), "E1",
"E1");
+    runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2"),
+        ImmutableMap.of("scan_type", "quick"), "E2", "E2");
+    runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2", "executor.slow", "E3"),
+        ImmutableMap.of("scan_type", "slow"), "E3", "E3");
   }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
index a9c72f8..4e14099 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -29,6 +29,8 @@ import org.apache.accumulo.core.spi.common.IteratorConfiguration;
 import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.accumulo.core.util.Stat;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestScanInfo implements ScanInfo {
 
   String testId;
@@ -56,6 +58,11 @@ public class TestScanInfo implements ScanInfo {
     }
   }
 
+  TestScanInfo setExecutionHints(String k, String v) {
+    this.executionHints = ImmutableMap.of(k, v);
+    return this;
+  }
+
   @Override
   public Type getScanType() {
     return scanType;


Mime
View raw message