drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [4/7] drill git commit: DRILL-2927: Correctly timeout query if a queue doesn't deplete within expected time.
Date Thu, 07 May 2015 08:56:40 GMT
DRILL-2927: Correctly timeout query if a queue doesn't deplete within expected time.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/21992b6b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/21992b6b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/21992b6b

Branch: refs/heads/master
Commit: 21992b6b6946f42d89924725dd65e301eebc7397
Parents: 7ec9987
Author: Jacques Nadeau <jacques@apache.org>
Authored: Wed May 6 14:09:01 2015 +0100
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu May 7 00:12:20 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    | 21 +++++++-------------
 .../apache/drill/exec/work/foreman/Foreman.java | 21 +++++++++++++++-----
 2 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/21992b6b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a577815..fb764c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -194,20 +194,13 @@ public interface ExecConstants {
   public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width";
   public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY,
Long.MAX_VALUE, 8);
 
-  public static final String ENABLE_QUEUE_KEY = "exec.queue.enable";
-  public static final OptionValidator ENABLE_QUEUE = new BooleanValidator(ENABLE_QUEUE_KEY,
false);
-
-  public static final String LARGE_QUEUE_KEY = "exec.queue.large";
-  public static final OptionValidator LARGE_QUEUE_SIZE = new PositiveLongValidator(LARGE_QUEUE_KEY,
1000, 10);
-
-  public static final String SMALL_QUEUE_KEY = "exec.queue.small";
-  public static final OptionValidator SMALL_QUEUE_SIZE = new PositiveLongValidator(SMALL_QUEUE_KEY,
100000, 100);
-
-  public static final String QUEUE_THRESHOLD_KEY = "exec.queue.threshold";
-  public static final OptionValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator(QUEUE_THRESHOLD_KEY,
Long.MAX_VALUE, 30000000);
-
-  public static final String QUEUE_TIMEOUT_KEY = "exec.queue.timeout_millis";
-  public static final OptionValidator QUEUE_TIMEOUT = new PositiveLongValidator(QUEUE_TIMEOUT_KEY,
Long.MAX_VALUE, 60*1000*5);
+  public static final BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable",
false);
+  public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large",
1000, 10);
+  public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small",
100000, 100);
+  public static final LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold",
+      Long.MAX_VALUE, 30000000);
+  public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis",
+      Long.MAX_VALUE, 60 * 1000 * 5);
 
   public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY,
false);

http://git-wip-us.apache.org/repos/asf/drill/blob/21992b6b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d678cc5..49d0c94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -406,14 +406,16 @@ public class Foreman implements Runnable {
    */
   private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException
{
     final OptionManager optionManager = queryContext.getOptions();
-    final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
+    final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
     if (queuingEnabled) {
-      final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+      final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
       double totalCost = 0;
       for (final PhysicalOperator ops : plan.getSortedOperators()) {
         totalCost += ops.getCost();
       }
 
+      final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+
       try {
         @SuppressWarnings("resource")
         final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
@@ -421,18 +423,27 @@ public class Foreman implements Runnable {
 
         // get the appropriate semaphore
         if (totalCost > queueThreshold) {
-          final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+          final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
           distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
         } else {
-          final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+          final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
           distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
         }
 
-        final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+
         lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
       } catch (final Exception e) {
         throw new ForemanSetupException("Unable to acquire slot for query.", e);
       }
+
+      if (lease == null) {
+        throw UserException
+            .resourceError()
+            .message("Unable to acquire queue resources for query within timeout.  Timeout
was set at %d seconds.",
+                queueTimeout / 1000)
+            .build();
+      }
+
     }
   }
 


Mime
View raw message