drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [3/3] drill git commit: DRILL-5716: Queue-driven memory allocation
Date Mon, 09 Oct 2017 22:58:51 GMT
DRILL-5716: Queue-driven memory allocation

* Creates new core resource management and query queue abstractions.
* Adds queue information to the Protobuf layer.
* Foreman and Planner changes

- Abstracts memory management out to the new resource management layer.
This means deferring generating the physical plan JSON to later in the
process after memory planning.

* Web UI changes

* Adds queue information to the main page and the profile page to each
query.
* Also sorts the list of options displayed in the Web UI.

- Added memory reserve

A new config parameter, exec.queue.memory_reserve_ratio, sets aside a
slice of total memory for operators that do not participate in the
memory assignment process. The default is 20% testing will tell us if
that value should be larger or smaller.

* Additional minor fixes

- Code cleanup.
- Added mechanism to abandon lease release during shutdown.
- Log queue configuration only when the config changes, rather than on
every query.
- Apply Boaz’ option to enforce a minimum memory allocation per
operator.
- Additional logging to help testers see what is happening.

closes #928


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bbc42240
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bbc42240
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bbc42240

Branch: refs/heads/master
Commit: bbc42240483a0658691149aea3c509ccd0db4c79
Parents: a03f542
Author: Paul Rogers <progers@maprtech.com>
Authored: Wed Aug 30 14:32:17 2017 -0700
Committer: Paul Rogers <progers@maprtech.com>
Committed: Mon Oct 9 15:58:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  25 +-
 .../exec/coord/store/TransientStoreConfig.java  |   2 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  13 +-
 .../drill/exec/physical/PhysicalPlan.java       |  16 +-
 .../drill/exec/physical/base/FragmentRoot.java  |   7 +-
 .../HardAffinityFragmentParallelizer.java       |   2 +-
 .../exec/planner/fragment/Materializer.java     |   3 +-
 .../exec/planner/fragment/PlanningSet.java      |   4 +-
 .../planner/fragment/SimpleParallelizer.java    |  50 +-
 .../SoftAffinityFragmentParallelizer.java       |   3 +-
 .../fragment/contrib/SplittingParallelizer.java |  24 +-
 .../org/apache/drill/exec/server/Drillbit.java  |  10 +-
 .../drill/exec/server/DrillbitContext.java      |  40 +-
 .../server/options/DrillConfigIterator.java     |   3 +
 .../drill/exec/server/options/OptionList.java   |   1 +
 .../server/options/SessionOptionManager.java    |   9 +-
 .../server/options/SystemOptionManager.java     |   9 +-
 .../drill/exec/server/rest/DrillRestServer.java |   4 +-
 .../drill/exec/server/rest/DrillRoot.java       | 137 +++-
 .../server/rest/GenericExceptionMapper.java     |  25 +-
 .../drill/exec/server/rest/LogsResources.java   |   1 +
 .../drill/exec/server/rest/StatusResources.java |  12 +-
 .../exec/server/rest/StorageResources.java      |   3 +-
 .../drill/exec/server/rest/WebServer.java       |   4 +-
 .../exec/server/rest/profile/Comparators.java   |  13 +
 .../server/rest/profile/ProfileResources.java   |  87 ++-
 .../server/rest/profile/ProfileWrapper.java     |   1 -
 .../apache/drill/exec/util/BatchPrinter.java    |   8 +-
 .../exec/util/MemoryAllocationUtilities.java    |  19 +-
 .../apache/drill/exec/util/TestUtilities.java   |   3 +-
 .../apache/drill/exec/work/QueryWorkUnit.java   |  80 ++-
 .../org/apache/drill/exec/work/WorkManager.java |   4 +-
 .../apache/drill/exec/work/foreman/Foreman.java | 278 ++++-----
 .../drill/exec/work/foreman/QueryManager.java   |  30 +-
 .../foreman/rm/AbstractResourceManager.java     |  68 ++
 .../work/foreman/rm/DefaultResourceManager.java | 120 ++++
 .../work/foreman/rm/DistributedQueryQueue.java  | 363 +++++++++++
 .../work/foreman/rm/DynamicResourceManager.java | 146 +++++
 .../work/foreman/rm/EmbeddedQueryQueue.java     | 151 +++++
 .../drill/exec/work/foreman/rm/QueryQueue.java  | 140 +++++
 .../work/foreman/rm/QueryResourceAllocator.java |  56 ++
 .../work/foreman/rm/QueryResourceManager.java   |  73 +++
 .../exec/work/foreman/rm/ResourceManager.java   |  66 ++
 .../work/foreman/rm/ResourceManagerBuilder.java |  78 +++
 .../foreman/rm/ThrottledResourceManager.java    | 360 +++++++++++
 .../exec/work/foreman/rm/package-info.java      |  31 +
 .../drill/exec/work/user/PlanSplitter.java      |  14 +-
 .../src/main/resources/drill-module.conf        |  36 +-
 .../java-exec/src/main/resources/rest/index.ftl |  60 +-
 .../src/main/resources/rest/profile/profile.ftl |  15 +
 .../exec/physical/impl/TestLocalExchange.java   |   3 +-
 .../partitionsender/TestPartitionSender.java    |   3 +-
 .../drill/exec/pop/TestFragmentChecker.java     |   3 +-
 .../drill/exec/proto/SchemaUserBitShared.java   |  28 +
 .../apache/drill/exec/proto/UserBitShared.java  | 621 ++++++++++++++++---
 .../drill/exec/proto/beans/QueryInfo.java       |  45 ++
 .../drill/exec/proto/beans/QueryProfile.java    |  45 ++
 protocol/src/main/protobuf/UserBitShared.proto  |   6 +-
 58 files changed, 3002 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 72a73fc..9890b45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -350,7 +350,7 @@ public final class ExecConstants {
   public static final LongValidator MAX_QUERY_MEMORY_PER_NODE = new RangeLongValidator(MAX_QUERY_MEMORY_PER_NODE_KEY, 1024 * 1024, Long.MAX_VALUE);
 
   /**
-   * Minimum memory alocated to each buffered operator instance.
+   * Minimum memory allocated to each buffered operator instance.
    * <p/>
    * DEFAULT: 40 MB
    */
@@ -377,12 +377,31 @@ public final class ExecConstants {
   public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width";
   public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE);
 
+  // Resource management boot-time options.
+
+  public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node";
+  public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node";
+
+  // Resource management system run-time options.
+
+  // Enables queues. When running embedded, enables an in-process queue. When
+  // running distributed, enables the Zookeeper-based distributed queue.
+
   public static final BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable");
-  public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 1000);
-  public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100000);
+  public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 10_000);
+  public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100_000);
   public static final LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold", Long.MAX_VALUE);
   public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE);
 
+  // Ratio of memory for small queries vs. large queries.
+  // Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units.
+  // A lower limit of 1 enforces the intuition that a large query should never get
+  // *less* memory than a small one.
+
+  public static final DoubleValidator QUEUE_MEMORY_RATIO = new RangeDoubleValidator("exec.queue.memory_ratio", 1.0, 1000);
+
+  public static final DoubleValidator QUEUE_MEMORY_RESERVE = new RangeDoubleValidator("exec.queue.memory_reserve_ratio", 0, 1.0);
+
   public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
index 35c4a06..36f53ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
@@ -53,6 +53,7 @@ public class TransientStoreConfig<V> {
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof TransientStoreConfig && obj.getClass().equals(getClass())) {
+      @SuppressWarnings("unchecked")
       final TransientStoreConfig<V> other = (TransientStoreConfig<V>)obj;
       return Objects.equal(name, other.name) && Objects.equal(serializer, other.serializer);
     }
@@ -70,5 +71,4 @@ public class TransientStoreConfig<V> {
   public static <V> TransientStoreConfigBuilder<V> newJacksonBuilder(final ObjectMapper mapper, final Class<V> klazz) {
     return TransientStoreConfig.<V>newBuilder().serializer(new JacksonSerializer<>(mapper, klazz));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 435f35f..86074f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -60,10 +60,10 @@ import io.netty.buffer.DrillBuf;
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
 public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private final DrillbitContext drillbitContext;
   private final UserSession session;
+  private final QueryId queryId;
   private final QueryOptionManager queryOptions;
   private final PlannerSettings plannerSettings;
   private final ExecutionControls executionControls;
@@ -87,6 +87,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
   public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) {
     this.drillbitContext = drillbitContext;
     this.session = session;
+    this.queryId = queryId;
     queryOptions = new QueryOptionManager(session.getOptions());
     executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
     plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
@@ -118,14 +119,12 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
     return plannerSettings;
   }
 
-  public UserSession getSession() {
-    return session;
-  }
+  public UserSession getSession() { return session; }
 
   @Override
-  public BufferAllocator getAllocator() {
-    return allocator;
-  }
+  public BufferAllocator getAllocator() { return allocator; }
+
+  public QueryId getQueryId( ) { return queryId; }
 
   /**
    * Return reference to default schema instance in a schema tree. Each {@link org.apache.calcite.schema.SchemaPlus}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index e0902c8..e5c96a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Root;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -87,4 +88,17 @@ public class PhysicalPlan {
       throw new RuntimeException(e);
     }
   }
+
+  public double totalCost() {
+    double totalCost = 0;
+    for (final PhysicalOperator ops : getSortedOperators()) {
+      totalCost += ops.getCost();
+    }
+    return totalCost;
+  }
+
+  @JsonIgnore
+  public Graph<PhysicalOperator, Root, Leaf> graph() {
+    return graph;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
index 32910c3..b972dd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,9 +17,10 @@
  */
 package org.apache.drill.exec.physical.base;
 
-
 /**
- * Describes the root operation within a particular Fragment. This includes things Sender nodes.
+ * Describes the root operation within a particular Fragment. This includes
+ * things like Sender nodes.
  */
+
 public interface FragmentRoot extends FragmentLeaf {
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
index 550dcb2..9c635e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -66,7 +66,7 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
       }
     }
 
-    // Step 1: Find the width taking into various parameters
+    // Step 1: Find the width taking into account various parameters
     // 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
     //      with the calculation that ExcessiveExchangeRemover uses.
     int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index edec7e4..2fc7541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Store;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.work.foreman.ForemanException;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
index 3e0f35a..1529b6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.drill.exec.planner.fragment;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
 import com.google.common.collect.Maps;
 
 public class PlanningSet implements Iterable<Wrapper> {

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 4584bd5..d2efcfb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -46,9 +46,9 @@ import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -70,7 +70,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
 
   public SimpleParallelizer(QueryContext context) {
     OptionManager optionManager = context.getOptions();
-    long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET).num_val;
+    long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
     this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
     double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
     final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
@@ -123,12 +123,12 @@ public class SimpleParallelizer implements ParallelizationParameters {
    * @throws ExecutionSetupException
    */
   public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
-      Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
+      Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
       UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
 
     final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
     return generateWorkUnit(
-        options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
+        options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
   }
 
   /**
@@ -150,6 +150,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
     // no op
     throw new UnsupportedOperationException("Use children classes");
   }
+
   /**
    * Helper method to reuse the code for QueryWorkUnit(s) generation
    * @param activeEndpoints
@@ -209,6 +210,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
     // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
     // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
     // remove the fragment on which the current fragment depends on.
+
     final Set<Wrapper> roots = Sets.newHashSet();
     for(Wrapper w : planningSet) {
       roots.add(w);
@@ -257,11 +259,11 @@ public class SimpleParallelizer implements ParallelizationParameters {
   }
 
   protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
-      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+      Fragment rootNode, PlanningSet planningSet,
       UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
-    List<PlanFragment> fragments = Lists.newArrayList();
+    List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
 
-    PlanFragment rootFragment = null;
+    MinorFragmentDefn rootFragmentDefn = null;
     FragmentRoot rootOperator = null;
 
     // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
@@ -287,16 +289,6 @@ public class SimpleParallelizer implements ParallelizationParameters {
         Preconditions.checkArgument(op instanceof FragmentRoot);
         FragmentRoot root = (FragmentRoot) op;
 
-        // get plan as JSON
-        String plan;
-        String optionsData;
-        try {
-          plan = reader.writeJson(root);
-          optionsData = reader.writeJson(options);
-        } catch (JsonProcessingException e) {
-          throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
-        }
-
         FragmentHandle handle = FragmentHandle //
             .newBuilder() //
             .setMajorFragmentId(wrapper.getMajorFragmentId()) //
@@ -306,40 +298,36 @@ public class SimpleParallelizer implements ParallelizationParameters {
 
         PlanFragment fragment = PlanFragment.newBuilder() //
             .setForeman(foremanNode) //
-            .setFragmentJson(plan) //
             .setHandle(handle) //
             .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
             .setLeafFragment(isLeafFragment) //
             .setContext(queryContextInfo)
             .setMemInitial(wrapper.getInitialAllocation())//
             .setMemMax(wrapper.getMaxAllocation())
-            .setOptionsJson(optionsData)
             .setCredentials(session.getCredentials())
             .addAllCollector(CountRequiredFragments.getCollectors(root))
             .build();
 
+        MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options);
+
         if (isRootNode) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
-          }
-          rootFragment = fragment;
+          logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+          rootFragmentDefn = fragmentDefn;
           rootOperator = root;
         } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
-          }
-          fragments.add(fragment);
+          logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+          fragmentDefns.add(fragmentDefn);
         }
       }
     }
 
-    return new QueryWorkUnit(rootOperator, rootFragment, fragments);
+    return new QueryWorkUnit(rootOperator, rootFragmentDefn, fragmentDefns);
   }
 
-
   /**
    * Designed to setup initial values for arriving fragment accounting.
    */
+
   protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
     private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();
 
@@ -357,7 +345,6 @@ public class SimpleParallelizer implements ParallelizationParameters {
         list.add(ep.getId());
       }
 
-
       collectors.add(Collector.newBuilder()
         .setIsSpooling(receiver.isSpooling())
         .setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
@@ -374,6 +361,5 @@ public class SimpleParallelizer implements ParallelizationParameters {
       }
       return null;
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
index 644263e..1549a6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -53,7 +53,6 @@ public class SoftAffinityFragmentParallelizer implements FragmentParallelizer {
   @Override
   public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
       final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
-    final Fragment fragment = fragmentWrapper.getNode();
 
     // Find the parallelization width of fragment
     final Stats stats = fragmentWrapper.getStats();

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
index 395a9e1..1eb1296 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -41,9 +41,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -115,7 +115,6 @@ public class SplittingParallelizer extends SimpleParallelizer {
     int plansCount = 0;
     DrillbitEndpoint[] endPoints = null;
     long initialAllocation = 0;
-    long maxAllocation = 0;
 
     final Iterator<Wrapper> iter = planningSet.iterator();
     while (iter.hasNext()) {
@@ -131,7 +130,6 @@ public class SplittingParallelizer extends SimpleParallelizer {
         // allocation
         plansCount = wrapper.getWidth();
         initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
-        maxAllocation = (wrapper.getMaxAllocation() != 0 ) ? wrapper.getMaxAllocation()/plansCount : 0;
         endPoints = new DrillbitEndpoint[plansCount];
         for (int mfId = 0; mfId < plansCount; mfId++) {
           endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
@@ -140,7 +138,7 @@ public class SplittingParallelizer extends SimpleParallelizer {
     }
     if ( plansCount == 0 ) {
       // no exchange, return list of single QueryWorkUnit
-      workUnits.add(generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session, queryContextInfo));
+      workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo));
       return workUnits;
     }
 
@@ -171,9 +169,9 @@ public class SplittingParallelizer extends SimpleParallelizer {
       // Create a minorFragment for each major fragment.
       for (int minorFragmentId = 0; minorFragmentId < plansCount; minorFragmentId++) {
         // those fragments should be empty
-        List<PlanFragment> fragments = Lists.newArrayList();
+        List<MinorFragmentDefn> fragments = Lists.newArrayList();
 
-        PlanFragment rootFragment = null;
+        MinorFragmentDefn rootFragment = null;
         FragmentRoot rootOperator = null;
 
         IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
@@ -186,35 +184,25 @@ public class SplittingParallelizer extends SimpleParallelizer {
         Preconditions.checkArgument(op instanceof FragmentRoot);
         FragmentRoot root = (FragmentRoot) op;
 
-        // get plan as JSON
-        String plan;
-        String optionsData;
-        try {
-          plan = reader.writeJson(root);
-          optionsData = reader.writeJson(options);
-        } catch (JsonProcessingException e) {
-          throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
-        }
 
         PlanFragment fragment = PlanFragment.newBuilder() //
             .setForeman(endPoints[minorFragmentId]) //
-            .setFragmentJson(plan) //
             .setHandle(handle) //
             .setAssignment(endPoints[minorFragmentId]) //
             .setLeafFragment(isLeafFragment) //
             .setContext(queryContextInfo)
             .setMemInitial(initialAllocation)//
             .setMemMax(wrapper.getMaxAllocation()) // TODO - for some reason OOM is using leaf fragment max allocation divided by width
-            .setOptionsJson(optionsData)
             .setCredentials(session.getCredentials())
             .addAllCollector(CountRequiredFragments.getCollectors(root))
             .build();
 
+        MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options);
         if (isRootNode) {
           if (logger.isDebugEnabled()) {
             logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
           }
-          rootFragment = fragment;
+          rootFragment = fragmentDefn;
           rootOperator = root;
         } else {
           if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 80aaf0d..a333ff2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -103,6 +103,7 @@ public class Drillbit implements AutoCloseable {
     this(config, SystemOptionManager.createDefaultOptionDefinitions(), serviceSet, classpathScan);
   }
 
+  @SuppressWarnings("resource")
   @VisibleForTesting
   public Drillbit(
     final DrillConfig config,
@@ -122,7 +123,7 @@ public class Drillbit implements AutoCloseable {
       storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
     } else {
       coord = new ZKClusterCoordinator(config);
-      storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
+      storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider();
       isDistributedMode = true;
     }
 
@@ -157,6 +158,7 @@ public class Drillbit implements AutoCloseable {
     }
     final DrillbitEndpoint md = engine.start();
     manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider, profileStoreProvider);
+    @SuppressWarnings("resource")
     final DrillbitContext drillbitContext = manager.getContext();
     storageRegistry = drillbitContext.getStorage();
     storageRegistry.init();
@@ -166,6 +168,10 @@ public class Drillbit implements AutoCloseable {
     registrationHandle = coord.register(md);
     webServer.start();
 
+    // Must start the RM after the above since it needs to read system options.
+
+    drillbitContext.startRM();
+
     Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, new StackTrace()));
     logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
   }
@@ -224,6 +230,7 @@ public class Drillbit implements AutoCloseable {
       return;
     }
 
+    @SuppressWarnings("resource")
     final SystemOptionManager optionManager = getContext().getOptionManager();
 
     // parse out the properties, validate, and then set them
@@ -324,7 +331,6 @@ public class Drillbit implements AutoCloseable {
     return start(config, SystemOptionManager.createDefaultOptionDefinitions(), remoteServiceSet);
   }
 
-  @SuppressWarnings("resource")
   @VisibleForTesting
   public static Drillbit start(final DrillConfig config, final CaseInsensitiveMap<OptionDefinition> validators,
                                final RemoteServiceSet remoteServiceSet)

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 426b9d2..b8a8b1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.work.foreman.rm.ResourceManager;
+import org.apache.drill.exec.work.foreman.rm.ResourceManagerBuilder;
 
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
@@ -66,6 +68,7 @@ public class DrillbitContext implements AutoCloseable {
   // operator table for standard SQL operators and functions, Drill built-in UDFs
   private final DrillOperatorTable table;
   private final QueryProfileStoreContext profileStoreContext;
+  private ResourceManager resourceManager;
 
   public DrillbitContext(
       DrillbitEndpoint endpoint,
@@ -88,7 +91,7 @@ public class DrillbitContext implements AutoCloseable {
       WorkEventBus workBus,
       PersistentStoreProvider provider,
       PersistentStoreProvider profileStoreProvider) {
-    this.classpathScan = context.getClasspathScan();
+    classpathScan = context.getClasspathScan();
     this.workBus = workBus;
     this.controller = checkNotNull(controller);
     this.context = checkNotNull(context);
@@ -96,29 +99,40 @@ public class DrillbitContext implements AutoCloseable {
     this.connectionsPool = checkNotNull(connectionsPool);
     this.endpoint = checkNotNull(endpoint);
     this.provider = provider;
-    this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
+    DrillConfig config = context.getConfig();
+    lpPersistence = new LogicalPlanPersistence(config, classpathScan);
 
-    // TODO remove escaping "this".
-    this.storagePlugins = context.getConfig()
+    storagePlugins = config
         .getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this);
 
-    this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
-    this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
-    this.systemOptions = new SystemOptionManager(lpPersistence, provider, context.getConfig(), context.getDefinitions());
-    this.functionRegistry = new FunctionImplementationRegistry(context.getConfig(), classpathScan, systemOptions);
-    this.compiler = new CodeCompiler(context.getConfig(), systemOptions);
+    reader = new PhysicalPlanReader(config, classpathScan, lpPersistence, endpoint, storagePlugins);
+    operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
+    systemOptions = new SystemOptionManager(lpPersistence, provider, config, context.getDefinitions());
+    functionRegistry = new FunctionImplementationRegistry(config, classpathScan, systemOptions);
+    compiler = new CodeCompiler(config, systemOptions);
 
     // This operator table is built once and used for all queries which do not need dynamic UDF support.
-    this.table = new DrillOperatorTable(functionRegistry, systemOptions);
+    table = new DrillOperatorTable(functionRegistry, systemOptions);
 
     //This profile store context is built from the profileStoreProvider
-    this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
+    profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
   }
 
   public QueryProfileStoreContext getProfileStoreContext() {
     return profileStoreContext;
   }
 
+  /**
+   * Starts the resource manager. Must be called separately from the
+   * constructor after the system property mechanism is initialized
+   * since the builder will consult system options to determine the
+   * proper RM to use.
+   */
+
+  public void startRM() {
+    resourceManager = new ResourceManagerBuilder(this).build();
+  }
+
   public FunctionImplementationRegistry getFunctionImplementationRegistry() {
     return functionRegistry;
   }
@@ -243,4 +257,8 @@ public class DrillbitContext implements AutoCloseable {
     getRemoteFunctionRegistry().close();
     getCompiler().close();
   }
+
+  public ResourceManager getResourceManager() {
+    return resourceManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
index f2d352c..7d30b5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -73,6 +73,9 @@ public class DrillConfigIterator implements Iterable<OptionValue> {
 
       case NULL:
         throw new IllegalStateException("Config value \"" + name + "\" has NULL type");
+
+      default:
+        throw new IllegalStateException("Unknown type: " + cv.valueType());
       }
 
       return optionValue;

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
index 8851e1a..e99645d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.server.options;
 
 import java.util.ArrayList;
 
+@SuppressWarnings("serial")
 public class OptionList extends ArrayList<OptionValue>{
 
   public void merge(OptionList list){

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index 52bf403..635a1ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,14 +17,15 @@
  */
 package org.apache.drill.exec.server.options;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
+import java.util.Collection;
+import java.util.Map;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 
-import java.util.Collection;
-import java.util.Map;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 
 /**
  * {@link OptionManager} that holds options within {@link org.apache.drill.exec.rpc.user.UserSession} context. Options

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index b0863ee..bda6033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -160,6 +160,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.QUEUE_THRESHOLD_SIZE),
       new OptionDefinition(ExecConstants.QUEUE_TIMEOUT),
       new OptionDefinition(ExecConstants.SMALL_QUEUE_SIZE),
+      new OptionDefinition(ExecConstants.QUEUE_MEMORY_RESERVE, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+      new OptionDefinition(ExecConstants.QUEUE_MEMORY_RATIO, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
@@ -418,6 +420,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
 
   @Override
   public void close() throws Exception {
-    options.close();
+    // If the server exits very early, the options may not yet have
+    // been created. Gracefully handle that case.
+
+    if (options != null) {
+      options.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 238f5ca..6eb47e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -86,7 +86,6 @@ public class DrillRestServer extends ResourceConfig {
     register(MultiPartFeature.class);
     property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true);
 
-
     final boolean isAuthEnabled =
         workManager.getContext().getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
 
@@ -152,7 +151,6 @@ public class DrillRestServer extends ResourceConfig {
     return configuration;
   }
 
-
   public static class AuthWebUserConnectionProvider implements Factory<WebUserConnection> {
 
     @Inject
@@ -161,6 +159,7 @@ public class DrillRestServer extends ResourceConfig {
     @Inject
     WorkManager workManager;
 
+    @SuppressWarnings("resource")
     @Override
     public WebUserConnection provide() {
       final HttpSession session = request.getSession();
@@ -228,6 +227,7 @@ public class DrillRestServer extends ResourceConfig {
     @Inject
     WorkManager workManager;
 
+    @SuppressWarnings("resource")
     @Override
     public WebUserConnection provide() {
       final DrillbitContext drillbitContext = workManager.getContext();

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index 84c471e..70b82ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,8 +32,15 @@ import com.google.common.collect.Sets;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
 import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.ZKQueueInfo;
+import org.apache.drill.exec.work.foreman.rm.DynamicResourceManager;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue;
+import org.apache.drill.exec.work.foreman.rm.ResourceManager;
+import org.apache.drill.exec.work.foreman.rm.ThrottledResourceManager;
 import org.glassfish.jersey.server.mvc.Viewable;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -53,6 +60,7 @@ public class DrillRoot {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON());
   }
 
+  @SuppressWarnings("resource")
   @GET
   @Path("/cluster.json")
   @Produces(MediaType.APPLICATION_JSON)
@@ -60,10 +68,11 @@ public class DrillRoot {
     final Collection<DrillbitInfo> drillbits = Sets.newTreeSet();
     final Collection<String> mismatchedVersions = Sets.newTreeSet();
 
-    final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint();
+    final DrillbitContext dbContext = work.getContext();
+    final DrillbitEndpoint currentDrillbit = dbContext.getEndpoint();
     final String currentVersion = currentDrillbit.getVersion();
 
-    final DrillConfig config = work.getContext().getConfig();
+    final DrillConfig config = dbContext.getConfig();
     final boolean userEncryptionEnabled = config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED);
     final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
 
@@ -77,8 +86,90 @@ public class DrillRoot {
       drillbits.add(drillbit);
     }
 
-    return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
-      userEncryptionEnabled, bitEncryptionEnabled);
+     return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
+      userEncryptionEnabled, bitEncryptionEnabled,
+      QueueInfo.build(dbContext.getResourceManager()));
+  }
+
+  /**
+   * Pretty-printing wrapper class around the ZK-based queue summary.
+   */
+
+  @XmlRootElement
+  public static class QueueInfo {
+    private final ZKQueueInfo zkQueueInfo;
+
+    public static QueueInfo build(ResourceManager rm) {
+
+      // Consider queues enabled only if the ZK-based queues are in use.
+
+      ThrottledResourceManager throttledRM = null;
+      if (rm != null && rm instanceof DynamicResourceManager) {
+        DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
+        rm = dynamicRM.activeRM();
+      }
+      if (rm != null && rm instanceof ThrottledResourceManager) {
+        throttledRM = (ThrottledResourceManager) rm;
+      }
+      if (throttledRM == null) {
+        return new QueueInfo(null);
+      }
+      QueryQueue queue = throttledRM.queue();
+      if (queue == null || !(queue instanceof DistributedQueryQueue)) {
+        return new QueueInfo(null);
+      }
+
+      return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
+    }
+
+    @JsonCreator
+    public QueueInfo(ZKQueueInfo queueInfo) {
+      zkQueueInfo = queueInfo;
+    }
+
+    public boolean isEnabled() { return zkQueueInfo != null; }
+
+    public int smallQueueSize() {
+      return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
+    }
+
+    public int largeQueueSize() {
+      return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
+    }
+
+    public String threshold() {
+      return isEnabled()
+          ? Double.toString(zkQueueInfo.queueThreshold)
+          : "N/A";
+    }
+
+    public String smallQueueMemory() {
+      return isEnabled()
+          ? toBytes(zkQueueInfo.memoryPerSmallQuery)
+          : "N/A";
+    }
+
+    public String largeQueueMemory() {
+      return isEnabled()
+          ? toBytes(zkQueueInfo.memoryPerLargeQuery)
+          : "N/A";
+    }
+
+    public String totalMemory() {
+      return isEnabled()
+          ? toBytes(zkQueueInfo.memoryPerNode)
+          : "N/A";
+    }
+
+    private final long ONE_MB = 1024 * 1024;
+
+    private String toBytes(long memory) {
+      if (memory < 10 * ONE_MB) {
+        return String.format("%,d bytes", memory);
+      } else {
+        return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
+      }
+    }
   }
 
   @XmlRootElement
@@ -88,18 +179,21 @@ public class DrillRoot {
     private final Collection<String> mismatchedVersions;
     private final boolean userEncryptionEnabled;
     private final boolean bitEncryptionEnabled;
+    private final QueueInfo queueInfo;
 
     @JsonCreator
     public ClusterInfo(Collection<DrillbitInfo> drillbits,
                        String currentVersion,
                        Collection<String> mismatchedVersions,
                        boolean userEncryption,
-                       boolean bitEncryption) {
+                       boolean bitEncryption,
+                       QueueInfo queueInfo) {
       this.drillbits = Sets.newTreeSet(drillbits);
       this.currentVersion = currentVersion;
       this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
       this.userEncryptionEnabled = userEncryption;
       this.bitEncryptionEnabled = bitEncryption;
+      this.queueInfo = queueInfo;
     }
 
     public Collection<DrillbitInfo> getDrillbits() {
@@ -117,6 +211,8 @@ public class DrillRoot {
     public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
 
     public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
+
+    public QueueInfo queueInfo() { return queueInfo; }
   }
 
   public static class DrillbitInfo implements Comparable<DrillbitInfo> {
@@ -139,9 +235,7 @@ public class DrillRoot {
       this.versionMatch = versionMatch;
     }
 
-    public String getAddress() {
-      return address;
-    }
+    public String getAddress() { return address; }
 
     public String getUserPort() { return userPort; }
 
@@ -149,26 +243,20 @@ public class DrillRoot {
 
     public String getDataPort() { return dataPort; }
 
-    public String getVersion() {
-      return version;
-    }
+    public String getVersion() { return version; }
 
-    public boolean isCurrent() {
-      return current;
-    }
+    public boolean isCurrent() { return current; }
 
-    public boolean isVersionMatch() {
-      return versionMatch;
-    }
+    public boolean isVersionMatch() { return versionMatch; }
 
     /**
-     * Method used to sort drillbits. Current drillbit goes first.
-     * Then drillbits with matching versions, after them drillbits with mismatching versions.
-     * Matching drillbits are sorted according address natural order,
-     * mismatching drillbits are sorted according version, address natural order.
+     * Method used to sort Drillbits. Current Drillbit goes first.
+     * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
+     * Matching Drillbits are sorted according address natural order,
+     * mismatching Drillbits are sorted according version, address natural order.
      *
-     * @param drillbitToCompare drillbit to compare against
-     * @return -1 if drillbit should be before, 1 if after in list
+     * @param drillbitToCompare Drillbit to compare against
+     * @return -1 if Drillbit should be before, 1 if after in list
      */
     @Override
     public int compareTo(DrillbitInfo drillbitToCompare) {
@@ -189,5 +277,4 @@ public class DrillRoot {
       return this.versionMatch ? -1 : 1;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
index cc00e9b..7bf7c90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
@@ -20,22 +20,21 @@ package org.apache.drill.exec.server.rest;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
 
 public class GenericExceptionMapper implements ExceptionMapper<Throwable> {
-    @Override
-    public Response toResponse(Throwable throwable) {
-        return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
-                .entity(new GenericErrorMessage(throwable.getMessage()))
-                .type(MediaType.APPLICATION_JSON_TYPE)
-                .build();
-    }
+  @Override
+  public Response toResponse(Throwable throwable) {
+    return Response
+        .status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
+        .entity(new GenericErrorMessage(throwable.getMessage()))
+        .type(MediaType.APPLICATION_JSON_TYPE).build();
+  }
 
-    public static class GenericErrorMessage {
-        public final String errorMessage;
+  public static class GenericErrorMessage {
+    public final String errorMessage;
 
-        public GenericErrorMessage(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
+    public GenericErrorMessage(String errorMessage) {
+      this.errorMessage = errorMessage;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
index 16d213a..d24f03a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
@@ -111,6 +111,7 @@ public class LogsResources {
     final int maxLines = work.getContext().getOptionManager().getOption(ExecConstants.WEB_LOGS_MAX_LINES).num_val.intValue();
 
     try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+      @SuppressWarnings("serial")
       Map<Integer, String> cache = new LinkedHashMap<Integer, String>(maxLines, .75f, true) {
         @Override
         protected boolean removeEldestEntry(Map.Entry<Integer, String> eldest) {

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
index 4042a81..e6b116c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -80,6 +82,7 @@ public class StatusResources {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/status.ftl", sc, getStatusJSON());
   }
 
+  @SuppressWarnings("resource")
   private List<OptionWrapper> getSystemOptionsJSONHelper(boolean internal)
   {
     List<OptionWrapper> options = new LinkedList<>();
@@ -90,6 +93,12 @@ public class StatusResources {
       options.add(new OptionWrapper(option.name, option.getValue(), option.accessibleScopes, option.kind, option.scope));
     }
 
+    Collections.sort(options, new Comparator<OptionWrapper>() {
+      @Override
+      public int compare(OptionWrapper o1, OptionWrapper o2) {
+         return o1.name.compareTo(o2.name);
+      }
+    });
     return options;
   }
 
@@ -132,6 +141,7 @@ public class StatusResources {
     return getSystemOptionsHelper(true);
   }
 
+  @SuppressWarnings("resource")
   @POST
   @Path("option/{optionName}")
   @RolesAllowed(DrillUserPrincipal.ADMIN_ROLE)

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index ce26a31..cf5eb57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -93,6 +93,7 @@ public class StorageResources {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/list.ftl", sc, list);
   }
 
+  @SuppressWarnings("resource")
   @GET
   @Path("/storage/{name}.json")
   @Produces(MediaType.APPLICATION_JSON)

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 7c376f7..69f2cab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -138,6 +138,7 @@ public class WebServer implements AutoCloseable {
    * Start the web server including setup.
    * @throws Exception
    */
+  @SuppressWarnings("resource")
   public void start() throws Exception {
     if (!config.getBoolean(ExecConstants.HTTP_ENABLE)) {
       return;
@@ -266,6 +267,7 @@ public class WebServer implements AutoCloseable {
         }
 
         // Clear all the resources allocated for this session
+        @SuppressWarnings("resource")
         final WebSessionResources webSessionResources =
             (WebSessionResources) session.getAttribute(WebSessionResources.class.getSimpleName());
 
@@ -322,7 +324,7 @@ public class WebServer implements AutoCloseable {
    * Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings
    * they will be used else a self-signed certificate is generated and used.
    *
-   * @return Initialized {@link ServerConnector} for HTTPS connectios.
+   * @return Initialized {@link ServerConnector} for HTTPS connections.
    * @throws Exception
    */
   private ServerConnector createHttpsConnector(int port) throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
index fa2f43f..a61522a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -31,78 +31,91 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 interface Comparators {
 
   final static Comparator<MajorFragmentProfile> majorId = new Comparator<MajorFragmentProfile>() {
+    @Override
     public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) {
       return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> minorId = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> startTime = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getStartTime(), o2.getStartTime());
     }
   };
 
   final static Comparator<MinorFragmentProfile> lastUpdate = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getLastUpdate(), o2.getLastUpdate());
     }
   };
 
   final static Comparator<MinorFragmentProfile> lastProgress = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getLastProgress(), o2.getLastProgress());
     }
   };
 
   final static Comparator<MinorFragmentProfile> endTime = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime(), o2.getEndTime());
     }
   };
 
   final static Comparator<MinorFragmentProfile> fragmentPeakMemory = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
     }
   };
 
   final static Comparator<MinorFragmentProfile> runTime = new Comparator<MinorFragmentProfile>() {
+    @Override
     public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
     }
   };
 
   final static Comparator<OperatorProfile> operatorId = new Comparator<OperatorProfile>() {
+    @Override
     public int compare(final OperatorProfile o1, final OperatorProfile o2) {
       return Long.compare(o1.getOperatorId(), o2.getOperatorId());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> setupTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+    @Override
     public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> processTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+    @Override
     public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> waitTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+    @Override
     public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> operatorPeakMemory = new Comparator<Pair<OperatorProfile, Integer>>() {
+    @Override
     public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 16c07d2..875c96e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -73,17 +73,20 @@ public class ProfileResources {
   public static class ProfileInfo implements Comparable<ProfileInfo> {
     public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
 
-    private String queryId;
-    private long startTime;
-    private long endTime;
-    private Date time;
-    private String link;
-    private String foreman;
-    private String query;
-    private String state;
-    private String user;
-
-    public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, String state, String user) {
+    private final String queryId;
+    private final long startTime;
+    private final long endTime;
+    private final Date time;
+    private final String link;
+    private final String foreman;
+    private final String query;
+    private final String state;
+    private final String user;
+    private final double totalCost;
+    private final String queueName;
+
+    public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query,
+                       String state, String user, double totalCost, String queueName) {
       this.queryId = queryId;
       this.startTime = startTime;
       this.endTime = endTime;
@@ -93,52 +96,36 @@ public class ProfileResources {
       this.query = query.substring(0,  Math.min(query.length(), 150));
       this.state = state;
       this.user = user;
+      this.totalCost = totalCost;
+      this.queueName = queueName;
     }
 
-    public String getUser() {
-      return user;
-    }
+    public String getUser() { return user; }
 
-    public String getQuery(){
-      return query;
-    }
+    public String getQuery() { return query; }
 
-    public String getQueryId() {
-      return queryId;
-    }
+    public String getQueryId() { return queryId; }
 
-    public String getTime() {
-      return format.format(time);
-    }
+    public String getTime() { return format.format(time); }
 
-    public long getStartTime() {
-      return startTime;
-    }
+    public long getStartTime() { return startTime; }
 
-    public long getEndTime() {
-      return endTime;
-    }
+    public long getEndTime() { return endTime; }
 
     public String getDuration() {
       return (new SimpleDurationFormat(startTime, endTime)).verbose();
     }
 
-    public String getState() {
-      return state;
-    }
+    public String getState() { return state; }
 
-    public String getLink() {
-      return link;
-    }
+    public String getLink() { return link; }
 
     @Override
     public int compareTo(ProfileInfo other) {
       return time.compareTo(other.time);
     }
 
-    public String getForeman() {
-      return foreman;
-    }
+    public String getForeman() { return foreman; }
 
     /**
      * Generates link which will return query profile in json representation.
@@ -164,6 +151,9 @@ public class ProfileResources {
       return sb.toString();
     }
 
+    public double getTotalCost() { return totalCost; }
+
+    public String getQueueName() { return queueName; }
   }
 
   protected PersistentStoreProvider getProvider() {
@@ -204,6 +194,7 @@ public class ProfileResources {
   //max Param to cap listing of profiles
   private static final String MAX_QPROFILES_PARAM = "max";
 
+  @SuppressWarnings("resource")
   @GET
   @Path("/profiles.json")
   @Produces(MediaType.APPLICATION_JSON)
@@ -223,8 +214,11 @@ public class ProfileResources {
           final Map.Entry<String, QueryInfo> runningEntry = runningEntries.next();
           final QueryInfo profile = runningEntry.getValue();
           if (principal.canManageProfileOf(profile.getUser())) {
-            runningQueries.add(new ProfileInfo(work.getContext().getConfig(), runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(), profile.getForeman()
-              .getAddress(), profile.getQuery(), profile.getState().name(), profile.getUser()));
+            runningQueries.add(
+                new ProfileInfo(work.getContext().getConfig(),
+                    runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(),
+                    profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(),
+                    profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
           }
         } catch (Exception e) {
           errors.add(e.getMessage());
@@ -250,8 +244,11 @@ public class ProfileResources {
           final Map.Entry<String, QueryProfile> profileEntry = range.next();
           final QueryProfile profile = profileEntry.getValue();
           if (principal.canManageProfileOf(profile.getUser())) {
-            finishedQueries.add(new ProfileInfo(work.getContext().getConfig(), profileEntry.getKey(), profile.getStart(), profile.getEnd(), profile.getForeman().getAddress(),
-              profile.getQuery(), profile.getState().name(), profile.getUser()));
+            finishedQueries.add(
+                new ProfileInfo(work.getContext().getConfig(),
+                    profileEntry.getKey(), profile.getStart(), profile.getEnd(),
+                    profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(),
+                    profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
           }
         } catch (Exception e) {
           errors.add(e.getMessage());
@@ -277,6 +274,7 @@ public class ProfileResources {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/list.ftl", sc, profiles);
   }
 
+  @SuppressWarnings("resource")
   private QueryProfile getQueryProfile(String queryId) {
     QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
 
@@ -343,7 +341,7 @@ public class ProfileResources {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper);
   }
 
-
+  @SuppressWarnings("resource")
   @GET
   @Path("/profiles/cancel/{queryid}")
   @Produces(MediaType.TEXT_PLAIN)
@@ -372,7 +370,8 @@ public class ProfileResources {
       }
     }catch(Exception e){
       logger.debug("Failure to find query as running profile.", e);
-      return String.format("Failure attempting to cancel query %s.  Unable to find information about where query is actively running.", queryId);
+      return String.format
+          ("Failure attempting to cancel query %s.  Unable to find information about where query is actively running.", queryId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 8f3cdfe..ef9ccc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.server.rest.profile;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 7e32a4d..9382f4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
 public class BatchPrinter {
   public static void printHyperBatch(VectorAccessible batch, SelectionVector4 sv4) {
     List<String> columns = Lists.newArrayList();
-    for (VectorWrapper vw : batch) {
+    for (VectorWrapper<?> vw : batch) {
       columns.add(vw.getValueVectors()[0].getField().getName());
     }
     int width = columns.size();
@@ -47,7 +47,7 @@ public class BatchPrinter {
         System.out.printf("|\n");
         System.out.println(StringUtils.repeat("-", width * 17 + 1));
       }
-      for (VectorWrapper vw : batch) {
+      for (VectorWrapper<?> vw : batch) {
         Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
         String value;
         if (o == null) {
@@ -67,7 +67,7 @@ public class BatchPrinter {
   public static void printBatch(VectorAccessible batch) {
     List<String> columns = Lists.newArrayList();
     List<ValueVector> vectors = Lists.newArrayList();
-    for (VectorWrapper vw : batch) {
+    for (VectorWrapper<?> vw : batch) {
       columns.add(vw.getValueVector().getField().getName());
       vectors.add(vw.getValueVector());
     }
@@ -101,7 +101,7 @@ public class BatchPrinter {
   public static void printBatch(VectorAccessible batch, SelectionVector2 sv2) {
     List<String> columns = Lists.newArrayList();
     List<ValueVector> vectors = Lists.newArrayList();
-    for (VectorWrapper vw : batch) {
+    for (VectorWrapper<?> vw : batch) {
       columns.add(vw.getValueVector().getField().getName());
       vectors.add(vw.getValueVector());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 48724a4..6a805c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -33,7 +33,22 @@ public class MemoryAllocationUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
 
   /**
-   * Helper method to setup SortMemoryAllocations
+   * Helper method to setup Memory Allocations
+   * <p>
+   * Plan the memory for buffered operators (the only ones that can spill in this release)
+   * based on assumptions. These assumptions are the amount of memory per node to give
+   * to each query and the number of sort operators per node.
+   * <p>
+   * The reason the total
+   * memory is an assumption is that we have know knowledge of the number of queries
+   * that can run, so we need the user to tell use that information by configuring the
+   * amount of memory to be assumed available to each query.
+   * <p>
+   * The number of sorts per node could be calculated, but we instead simply take
+   * the worst case: the maximum per-query, per-node parallization and assume that
+   * all sorts appear in all fragments &mdash; a gross oversimplification, but one
+   * that Drill has long made.
+   * <p>
    * since this method can be used in multiple places adding it in this class
    * rather than keeping it in Foreman
    * @param plan
@@ -50,7 +65,7 @@ public class MemoryAllocationUtilities {
     // look for external sorts
     final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
     for (final PhysicalOperator op : plan.getSortedOperators()) {
-      if ( op.isBufferedOperator() ) {
+      if (op.isBufferedOperator()) {
         bufferedOpList.add(op);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index 0200dc5..a9f178a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -112,7 +112,7 @@ public class TestUtilities {
    * @param jsonBatches : list of input strings, each element represent a batch. Each string could either
    *                    be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}".
    * @param fragContext : fragment context
-   * @param columnsToRead : list of schema pathes to read from JSON reader.
+   * @param columnsToRead : list of schema paths to read from JSON reader.
    * @return
    */
   public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
@@ -145,5 +145,4 @@ public class TestUtilities {
     }
     return readers.iterator();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index 4f99f85..a06d46c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,39 +17,99 @@
  */
 package org.apache.drill.exec.work;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 
 public class QueryWorkUnit {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
-  private final PlanFragment rootFragment; // for local
+
+  /**
+   * Definition of a minor fragment that contains the (unserialized) fragment operator
+   * tree and the (partially built) fragment. Allows the resource manager to apply
+   * memory allocations before serializing the fragments to JSON.
+   */
+
+  public static class MinorFragmentDefn {
+    private PlanFragment fragment;
+    private final FragmentRoot root;
+    private final OptionList options;
+
+    public MinorFragmentDefn(final PlanFragment fragment, final FragmentRoot root, OptionList options) {
+      this.fragment = fragment;
+      this.root = root;
+      this.options = options;
+    }
+
+    public FragmentRoot root() { return root; }
+    public PlanFragment fragment() { return fragment; }
+    public OptionList options() { return options; }
+
+    public PlanFragment applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+      // get plan as JSON
+      try {
+        final String plan = reader.writeJson(root);
+        final String optionsData = reader.writeJson(options);
+        return PlanFragment.newBuilder(fragment)
+            .setFragmentJson(plan)
+            .setOptionsJson(optionsData)
+            .build();
+      } catch (JsonProcessingException e) {
+        throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
+      }
+    }
+  }
+
+  private PlanFragment rootFragment; // for local
+  private final MinorFragmentDefn rootFragmentDefn;
   private final FragmentRoot rootOperator; // for local
-  private final List<PlanFragment> fragments;
+  private List<PlanFragment> fragments = new ArrayList<>();
+  private final List<MinorFragmentDefn> minorFragmentDefns;
 
-  public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment,
-      final List<PlanFragment> fragments) {
-    Preconditions.checkNotNull(rootFragment);
-    Preconditions.checkNotNull(fragments);
+  public QueryWorkUnit(final FragmentRoot rootOperator, final MinorFragmentDefn rootFragmentDefn,
+      final List<MinorFragmentDefn> minorFragmentDefns) {
     Preconditions.checkNotNull(rootOperator);
+    Preconditions.checkNotNull(rootFragmentDefn);
+    Preconditions.checkNotNull(minorFragmentDefns);
 
-    this.rootFragment = rootFragment;
-    this.fragments = fragments;
+    this.rootFragmentDefn = rootFragmentDefn;
     this.rootOperator = rootOperator;
+    this.minorFragmentDefns = minorFragmentDefns;
   }
 
   public PlanFragment getRootFragment() {
     return rootFragment;
   }
 
+  public MinorFragmentDefn getRootFragmentDefn() {
+    return rootFragmentDefn;
+  }
+
   public List<PlanFragment> getFragments() {
     return fragments;
   }
 
+  public List<MinorFragmentDefn> getMinorFragmentDefns() {
+    return minorFragmentDefns;
+  }
+
   public FragmentRoot getRootOperator() {
     return rootOperator;
   }
+
+  public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+    assert rootFragment == null;
+    rootFragment = rootFragmentDefn.applyPlan(reader);
+    assert fragments.isEmpty();
+    for (MinorFragmentDefn defn : minorFragmentDefns) {
+      fragments.add(defn.applyPlan(reader));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 800d3a7..6e560a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -149,7 +149,9 @@ public class WorkManager implements AutoCloseable {
       }
     }
 
-    getContext().close();
+    if (getContext() != null) {
+      getContext().close();
+    }
   }
 
   public DrillbitContext getContext() {


Mime
View raw message