drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [02/12] drill git commit: DRILL-5485: Remove WebServer dependency on DrillClient
Date Sat, 03 Jun 2017 04:45:57 GMT
DRILL-5485: Remove WebServer dependency on DrillClient

1. Added WebUserConnection/AnonWebUserConnection and their providers for Authenticated and Anonymous web users.
2. Updated to store the UserSession, BufferAllocator and other session states inside the HttpSession of Jetty instead
	of storing in DrillUserPrincipal. For each request now a new instance of WebUserConnection will be created. However
	for authenticated users the UserSession and other states will be re-used whereas for Anonymous Users it will created
	for each request and later re-cycled after query execution.

close #829


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/874bf629
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/874bf629
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/874bf629

Branch: refs/heads/master
Commit: 874bf6296dcd1a42c7cf7f097c1a6b5458010cbb
Parents: d38917b
Author: Sorabh Hamirwasia <shamirwasia@maprtech.com>
Authored: Fri Apr 21 18:34:19 2017 -0700
Committer: Jinfeng Ni <jni@apache.org>
Committed: Fri Jun 2 21:43:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../exec/ops/AccountingUserConnection.java      |   4 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   2 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |   2 +-
 .../AbstractDisposableUserClientConnection.java | 107 ++++++++++++
 .../drill/exec/rpc/UserClientConnection.java    |  69 ++++++++
 .../apache/drill/exec/rpc/user/UserServer.java  |  42 +----
 .../drill/exec/server/DrillbitContext.java      |  16 +-
 .../drill/exec/server/rest/DrillRestServer.java | 149 ++++++++++++++++-
 .../drill/exec/server/rest/QueryResources.java  |  51 +++---
 .../drill/exec/server/rest/QueryWrapper.java    | 157 ++++--------------
 .../drill/exec/server/rest/WebServer.java       |  32 ++--
 .../exec/server/rest/WebSessionResources.java   |  84 ++++++++++
 .../exec/server/rest/WebUserConnection.java     | 164 +++++++++++++++++++
 .../rest/auth/AbstractDrillLoginService.java    |  95 -----------
 .../server/rest/auth/DrillRestLoginService.java |  75 +++++++--
 .../server/rest/auth/DrillUserPrincipal.java    |  82 ++--------
 .../apache/drill/exec/work/foreman/Foreman.java |   2 +-
 .../work/prepare/PreparedStatementProvider.java |  78 +++------
 .../drill/exec/work/user/PlanSplitter.java      |   2 +-
 .../apache/drill/exec/work/user/UserWorker.java |   2 +-
 .../src/main/resources/drill-module.conf        |   6 +
 .../apache/drill/exec/client/DumpCatTest.java   |   2 +-
 .../drill/exec/fn/impl/TestMathFunctions.java   |   2 +-
 .../drill/exec/fn/impl/TestMultiInputAdd.java   |   4 +-
 .../exec/fn/impl/TestNewMathFunctions.java      |  12 +-
 .../exec/fn/impl/TestRepeatedFunction.java      |   2 +-
 .../exec/physical/impl/TestCastFunctions.java   |  18 +-
 .../physical/impl/TestComparisonFunctions.java  |  16 +-
 .../physical/impl/TestConvertFunctions.java     |  12 +-
 .../impl/TestImplicitCastFunctions.java         |  10 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 .../physical/impl/TestReverseImplicitCast.java  |   4 +-
 .../exec/physical/impl/TestSimpleFunctions.java |   8 +-
 .../exec/physical/impl/TestStringFunctions.java |  40 ++---
 .../drill/exec/physical/impl/agg/TestAgg.java   |   2 +-
 .../physical/impl/filter/TestSimpleFilter.java  |   2 +-
 .../exec/physical/impl/join/TestHashJoin.java   |  16 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |  11 +-
 .../physical/impl/limit/TestSimpleLimit.java    |  12 +-
 .../impl/project/TestSimpleProjection.java      |   2 +-
 .../exec/physical/impl/sort/TestSimpleSort.java |   2 +-
 .../impl/trace/TestTraceMultiRecordBatch.java   |   2 +-
 .../impl/trace/TestTraceOutputDump.java         |   2 +-
 .../physical/impl/union/TestSimpleUnion.java    |   4 +-
 .../drill/exec/record/TestRecordIterator.java   |   6 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   4 +-
 47 files changed, 871 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7c681c1..18f69d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -99,6 +99,8 @@ public interface ExecConstants {
   String HTTP_CORS_ALLOWED_METHODS = "drill.exec.http.cors.allowedMethods";
   String HTTP_CORS_ALLOWED_HEADERS = "drill.exec.http.cors.allowedHeaders";
   String HTTP_CORS_CREDENTIALS = "drill.exec.http.cors.credentials";
+  String HTTP_SESSION_MEMORY_RESERVATION = "drill.exec.http.session.memory.reservation";
+  String HTTP_SESSION_MEMORY_MAXIMUM = "drill.exec.http.session.memory.maximum";
   String HTTP_SESSION_MAX_IDLE_SECS = "drill.exec.http.session_max_idle_secs";
   String HTTP_KEYSTORE_PATH = "javax.net.ssl.keyStore";
   String HTTP_KEYSTORE_PASSWORD = "javax.net.ssl.keyStorePassword";

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
index e3add13..7a01fcd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.ops;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 
 /**
- * Wrapper around a {@link org.apache.drill.exec.rpc.user.UserServer.UserClientConnection} that tracks the status of batches
+ * Wrapper around a {@link UserClientConnection} that tracks the status of batches
  * sent to User.
  */
 public class AccountingUserConnection {

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 8335547..badf70c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -48,7 +48,7 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 27c853a..2a378ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -53,7 +53,7 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.WindowPOP;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.calcite.rel.RelFieldCollation.Direction;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
new file mode 100644
index 0000000..33536c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helps to run a query and await on the results. All the inheriting sub-class manages the session/connection
+ * state and submits query with respect to that state. The subclass instance lifetime is per query lifetime
+ * and is not re-used.
+ */
+public abstract class AbstractDisposableUserClientConnection implements UserClientConnection {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class);
+
+  protected final CountDownLatch latch = new CountDownLatch(1);
+
+  protected volatile DrillPBError error;
+
+  protected volatile UserException exception;
+
+  /**
+   * Wait until the query has completed or timeout is passed.
+   *
+   * @throws InterruptedException
+   */
+  public boolean await(final long timeoutMillis) throws InterruptedException {
+    return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Wait indefinitely until the query is completed. Used only in case of WebUser
+   *
+   * @throws Exception
+   */
+  public void await() throws Exception {
+    latch.await();
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  @Override
+  public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result) {
+
+    Preconditions.checkState(result.hasQueryState());
+
+    // Release the wait latch if the query is terminated.
+    final QueryState state = result.getQueryState();
+    final QueryId queryId = result.getQueryId();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Result arrived for QueryId: {} with QueryState: {}", QueryIdHelper.getQueryId(queryId), state);
+    }
+
+    switch (state) {
+      case FAILED:
+        error = result.getError(0);
+        exception = new UserRemoteException(error);
+        latch.countDown();
+        break;
+      case CANCELED:
+      case COMPLETED:
+        Preconditions.checkState(result.getErrorCount() == 0);
+        latch.countDown();
+        break;
+      default:
+        logger.error("Query with QueryId: {} is in unexpected state: {}", queryId, state);
+    }
+
+    // Notify the listener with ACK
+    listener.success(Acks.OK, null);
+  }
+
+  /**
+   * @return Any error returned in query execution.
+   */
+  public DrillPBError getError() {
+    return error;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
new file mode 100644
index 0000000..43247f8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import java.net.SocketAddress;
+
+/**
+ * Interface for getting user session properties and interacting with user connection. Separating this interface from
+ * {@link AbstractRemoteConnection} implementation for user connection:
+ * <p><ul>
+ * <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
+ * <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
+ * going to the actual client.
+ * </ul>
+ */
+public interface UserClientConnection {
+  /**
+   * @return User session object.
+   */
+  UserSession getSession();
+
+  /**
+   * Send query result outcome to client. Outcome is returned through <code>listener</code>
+   *
+   * @param listener
+   * @param result
+   */
+  void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
+
+  /**
+   * Send query data to client. Outcome is returned through <code>listener</code>
+   *
+   * @param listener
+   * @param result
+   */
+  void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
+
+  /**
+   * Returns the {@link ChannelFuture} which will be notified when this
+   * channel is closed.  This method always returns the same future instance.
+   */
+  ChannelFuture getChannelClosureFuture();
+
+  /**
+   * @return Return the client node address.
+   */
+  SocketAddress getRemoteAddress();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 543145f..35dbbe9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.RpcConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
 import org.apache.drill.exec.rpc.security.plain.PlainFactory;
 import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
@@ -96,47 +97,6 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
   }
 
   /**
-   * Interface for getting user session properties and interacting with user connection. Separating this interface from
-   * {@link AbstractRemoteConnection} implementation for user connection:
-   * <p><ul>
-   *   <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
-   *   <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
-   *        going to the actual client.
-   * </ul>
-   */
-  public interface UserClientConnection {
-    /**
-     * @return User session object.
-     */
-    UserSession getSession();
-
-    /**
-     * Send query result outcome to client. Outcome is returned through <code>listener</code>
-     * @param listener
-     * @param result
-     */
-    void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
-
-    /**
-     * Send query data to client. Outcome is returned through <code>listener</code>
-     * @param listener
-     * @param result
-     */
-    void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
-
-    /**
-     * Returns the {@link ChannelFuture} which will be notified when this
-     * channel is closed.  This method always returns the same future instance.
-     */
-    ChannelFuture getChannelClosureFuture();
-
-    /**
-     * @return Return the client node address.
-     */
-    SocketAddress getRemoteAddress();
-  }
-
-  /**
    * {@link AbstractRemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}.
    */
   public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b8d3e68..973b97c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -17,12 +17,8 @@
  */
 package org.apache.drill.exec.server;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.codahale.metrics.MetricRegistry;
 import io.netty.channel.EventLoopGroup;
-
-import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -38,12 +34,16 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 
-import com.codahale.metrics.MetricRegistry;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class DrillbitContext implements AutoCloseable {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -232,6 +232,10 @@ public class DrillbitContext implements AutoCloseable {
     return table;
   }
 
+  public AuthenticatorProvider getAuthProvider() {
+    return context.getAuthProvider();
+  }
+
   @Override
   public void close() throws Exception {
     getOptionManager().close();

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 0401d58..e88d1b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -17,7 +17,17 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.base.JsonMappingExceptionMapper;
+import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.rest.WebUserConnection.AnonWebUserConnection;
 import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.AnonDrillUserPrincipal;
@@ -36,13 +46,13 @@ import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
 import org.glassfish.jersey.server.mvc.freemarker.FreemarkerMvcFeature;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.jaxrs.base.JsonMappingExceptionMapper;
-import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
-import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
-
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
 
 public class DrillRestServer extends ResourceConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class);
@@ -70,7 +80,8 @@ public class DrillRestServer extends ResourceConfig {
     }
 
     //disable moxy so it doesn't conflict with jackson.
-    final String disableMoxy = PropertiesHelper.getPropertyNameForRuntime(CommonProperties.MOXY_JSON_FEATURE_DISABLE, getConfiguration().getRuntimeType());
+    final String disableMoxy = PropertiesHelper.getPropertyNameForRuntime(CommonProperties.MOXY_JSON_FEATURE_DISABLE,
+        getConfiguration().getRuntimeType());
     property(disableMoxy, true);
 
     register(JsonParseExceptionMapper.class);
@@ -91,13 +102,136 @@ public class DrillRestServer extends ResourceConfig {
         bind(new UserAuthEnabled(isAuthEnabled)).to(UserAuthEnabled.class);
         if (isAuthEnabled) {
           bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
+          bindFactory(AuthWebUserConnectionProvider.class).to(WebUserConnection.class);
         } else {
           bindFactory(AnonDrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
+          bindFactory(AnonWebUserConnectionProvider.class).to(WebUserConnection.class);
         }
       }
     });
   }
 
+  public static class AuthWebUserConnectionProvider implements Factory<WebUserConnection> {
+
+    @Inject
+    HttpServletRequest request;
+
+    @Inject
+    WorkManager workManager;
+
+    @Override
+    public WebUserConnection provide() {
+      final HttpSession session = request.getSession();
+      final Principal sessionUserPrincipal = request.getUserPrincipal();
+
+      // If there is no valid principal this means user is not logged in yet.
+      if (sessionUserPrincipal == null) {
+        return null;
+      }
+
+      // User is logged in, get/set the WebSessionResources attribute
+      WebSessionResources webSessionResources =
+              (WebSessionResources) session.getAttribute(WebSessionResources.class.getSimpleName());
+
+      if (webSessionResources == null) {
+        // User is login in for the first time
+        final DrillbitContext drillbitContext = workManager.getContext();
+        final DrillConfig config = drillbitContext.getConfig();
+        final UserSession drillUserSession = UserSession.Builder.newBuilder()
+                .withCredentials(UserBitShared.UserCredentials.newBuilder()
+                        .setUserName(sessionUserPrincipal.getName())
+                        .build())
+                .withOptionManager(drillbitContext.getOptionManager())
+                .setSupportComplexTypes(config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES))
+                .build();
+
+        // Only try getting remote address in first login since it's a costly operation.
+        SocketAddress remoteAddress = null;
+        try {
+          // This can be slow as the underlying library will try to resolve the address
+          remoteAddress = new InetSocketAddress(InetAddress.getByName(request.getRemoteAddr()), request.getRemotePort());
+          session.setAttribute(SocketAddress.class.getSimpleName(), remoteAddress);
+        } catch (Exception ex) {
+          //no-op
+          logger.trace("Failed to get the remote address of the http session request", ex);
+        }
+
+        // Create per session BufferAllocator and set it in session
+        final String sessionAllocatorName = String.format("WebServer:AuthUserSession:%s", session.getId());
+        final BufferAllocator sessionAllocator = workManager.getContext().getAllocator().newChildAllocator(
+                sessionAllocatorName,
+                config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
+                config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
+
+        // Create a WebSessionResource instance which owns the lifecycle of all the session resources.
+        // Set this instance as an attribute of HttpSession, since it will be used until session is destroyed.
+        webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, drillUserSession);
+        session.setAttribute(WebSessionResources.class.getSimpleName(), webSessionResources);
+      }
+      // Create a new WebUserConnection for the request
+      return new WebUserConnection(webSessionResources);
+    }
+
+    @Override
+    public void dispose(WebUserConnection instance) {
+
+    }
+  }
+
+  public static class AnonWebUserConnectionProvider implements Factory<WebUserConnection> {
+
+    @Inject
+    HttpServletRequest request;
+
+    @Inject
+    WorkManager workManager;
+
+    @Override
+    public WebUserConnection provide() {
+      final HttpSession session = request.getSession();
+      final DrillbitContext drillbitContext = workManager.getContext();
+      final DrillConfig config = drillbitContext.getConfig();
+
+      // Create an allocator here for each request
+      final BufferAllocator sessionAllocator = drillbitContext.getAllocator()
+              .newChildAllocator("WebServer:AnonUserSession",
+                      config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
+                      config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
+
+      final Principal sessionUserPrincipal = new AnonDrillUserPrincipal();
+
+      // Create new UserSession for each request from Anonymous user
+      final UserSession drillUserSession = UserSession.Builder.newBuilder()
+              .withCredentials(UserBitShared.UserCredentials.newBuilder()
+                      .setUserName(sessionUserPrincipal.getName())
+                      .build())
+              .withOptionManager(drillbitContext.getOptionManager())
+              .setSupportComplexTypes(drillbitContext.getConfig().getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES))
+              .build();
+
+      // Try to get the remote Address but set it to null in case of failure.
+      SocketAddress remoteAddress = null;
+      try {
+        // This can be slow as the underlying library will try to resolve the address
+        remoteAddress = new InetSocketAddress(InetAddress.getByName(request.getRemoteAddr()), request.getRemotePort());
+      } catch (Exception ex) {
+        // no-op
+        logger.trace("Failed to get the remote address of the http session request", ex);
+      }
+
+      final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator,
+              remoteAddress, drillUserSession);
+
+      // Create a AnonWenUserConnection for this request
+      return new AnonWebUserConnection(webSessionResources);
+    }
+
+    @Override
+    public void dispose(WebUserConnection instance) {
+
+    }
+  }
+
   // Provider which injects DrillUserPrincipal directly instead of getting it from SecurityContext and typecasting
   public static class DrillUserPrincipalProvider implements Factory<DrillUserPrincipal> {
 
@@ -116,12 +250,11 @@ public class DrillRestServer extends ResourceConfig {
 
   // Provider which creates and cleanups DrillUserPrincipal for anonymous (auth disabled) mode
   public static class AnonDrillUserPrincipalProvider implements Factory<DrillUserPrincipal> {
-    @Inject WorkManager workManager;
 
     @RequestScoped
     @Override
     public DrillUserPrincipal provide() {
-      return new AnonDrillUserPrincipal(workManager.getContext());
+      return new AnonDrillUserPrincipal();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
index 433efaf..99e26ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,8 +17,14 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
+import org.apache.drill.exec.server.rest.QueryWrapper.QueryResult;
+import org.apache.drill.exec.work.WorkManager;
+import org.glassfish.jersey.server.mvc.Viewable;
 
 import javax.annotation.security.RolesAllowed;
 import javax.inject.Inject;
@@ -30,16 +36,8 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.SecurityContext;
-
-import com.google.common.base.CharMatcher;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
-import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
-import org.apache.drill.exec.work.WorkManager;
-import org.glassfish.jersey.server.mvc.Viewable;
+import java.util.List;
+import java.util.Map;
 
 @Path("/")
 @RolesAllowed(DrillUserPrincipal.AUTHENTICATED_ROLE)
@@ -49,7 +47,8 @@ public class QueryResources {
   @Inject UserAuthEnabled authEnabled;
   @Inject WorkManager work;
   @Inject SecurityContext sc;
-  @Inject DrillUserPrincipal principal;
+  @Inject WebUserConnection webUserConnection;
+
 
   @GET
   @Path("/query")
@@ -62,15 +61,13 @@ public class QueryResources {
   @Path("/query.json")
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
-  public QueryWrapper.QueryResult submitQueryJSON(QueryWrapper query) throws Exception {
-    DrillClient drillClient = null;
-
+  public QueryResult submitQueryJSON(QueryWrapper query) throws Exception {
     try {
-      final BufferAllocator allocator = work.getContext().getAllocator();
-      drillClient = principal.getDrillClient();
-      return query.run(drillClient, allocator);
+      // Run the query
+      return query.run(work, webUserConnection);
     } finally {
-      principal.recycleDrillClient(drillClient);
+      // no-op for authenticated user
+      webUserConnection.cleanupSession();
     }
   }
 
@@ -78,12 +75,14 @@ public class QueryResources {
   @Path("/query")
   @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
   @Produces(MediaType.TEXT_HTML)
-  public Viewable submitQuery(@FormParam("query") String query, @FormParam("queryType") String queryType) throws Exception {
+  public Viewable submitQuery(@FormParam("query") String query,
+                              @FormParam("queryType") String queryType) throws Exception {
     try {
       final String trimmedQueryString = CharMatcher.is(';').trimTrailingFrom(query.trim());
-      final QueryWrapper.QueryResult result = submitQueryJSON(new QueryWrapper(trimmedQueryString, queryType));
+      final QueryResult result = submitQueryJSON(new QueryWrapper(trimmedQueryString, queryType));
+
       return ViewableWithPermissions.create(authEnabled.get(), "/rest/query/result.ftl", sc, new TabularResult(result));
-    } catch(Exception | Error e) {
+    } catch (Exception | Error e) {
       logger.error("Query from Web UI Failed", e);
       return ViewableWithPermissions.create(authEnabled.get(), "/rest/query/errorMessage.ftl", sc, e);
     }
@@ -93,7 +92,7 @@ public class QueryResources {
     private final List<String> columns;
     private final List<List<String>> rows;
 
-    public TabularResult(QueryWrapper.QueryResult result) {
+    public TabularResult(QueryResult result) {
       final List<List<String>> rows = Lists.newArrayList();
       for (Map<String, String> rowMap:result.rows) {
         final List<String> row = Lists.newArrayList();
@@ -119,4 +118,6 @@ public class QueryResources {
       return rows;
     }
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 6784b82..4a168dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,41 +18,27 @@
 
 package org.apache.drill.exec.server.rest;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
+import org.apache.drill.exec.work.WorkManager;
+
+import javax.xml.bind.annotation.XmlRootElement;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.vector.ValueVector;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
 
 @XmlRootElement
 public class QueryWrapper {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
 
-  private String query;
-  private String queryType;
+  private final String query;
+
+  private final String queryType;
 
   @JsonCreator
   public QueryWrapper(@JsonProperty("query") String query, @JsonProperty("queryType") String queryType) {
@@ -68,36 +54,38 @@ public class QueryWrapper {
     return queryType;
   }
 
-  public UserBitShared.QueryType getType() {
-    UserBitShared.QueryType type = UserBitShared.QueryType.SQL;
-    switch (queryType) {
-      case "SQL" : type = UserBitShared.QueryType.SQL; break;
-      case "LOGICAL" : type = UserBitShared.QueryType.LOGICAL; break;
-      case "PHYSICAL" : type = UserBitShared.QueryType.PHYSICAL; break;
-    }
-    return type;
+  public QueryType getType() {
+    return QueryType.valueOf(queryType);
   }
 
-  public QueryResult run(final DrillClient client, final BufferAllocator allocator) throws Exception {
-    Listener listener = new Listener(allocator);
-    client.runQuery(getType(), query, listener);
-    listener.waitForCompletion();
-    if (listener.results.isEmpty()) {
-      listener.results.add(Maps.<String, String>newHashMap());
+  public QueryResult run(final WorkManager workManager, final WebUserConnection webUserConnection) throws Exception {
+
+    final RunQuery runQuery = RunQuery.newBuilder().setType(getType())
+        .setPlan(getQuery())
+        .setResultsMode(QueryResultsMode.STREAM_FULL)
+        .build();
+
+    // Submit user query to Drillbit work queue.
+    final QueryId queryId = workManager.getUserWorker().submitWork(webUserConnection, runQuery);
+
+    // Wait until the query execution is complete or there is error submitting the query
+    webUserConnection.await();
+
+    if (logger.isTraceEnabled()) {
+      logger.trace("Query {} is completed ", queryId);
     }
 
-    final Map<String, String> first = listener.results.get(0);
-    for (String columnName : listener.columns) {
-      if (!first.containsKey(columnName)) {
-        first.put(columnName, null);
-      }
+    if (webUserConnection.results.isEmpty()) {
+      webUserConnection.results.add(Maps.<String, String>newHashMap());
     }
 
-    return new QueryResult(listener.columns, listener.results);
+    // Return the QueryResult.
+    return new QueryResult(webUserConnection.columns, webUserConnection.results);
   }
 
   public static class QueryResult {
     public final Collection<String> columns;
+
     public final List<Map<String, String>> rows;
 
     public QueryResult(Collection<String> columns, List<Map<String, String>> rows) {
@@ -111,77 +99,4 @@ public class QueryWrapper {
     return "QueryRequest [queryType=" + queryType + ", query=" + query + "]";
   }
 
-
-  private static class Listener implements UserResultsListener {
-    private volatile UserException exception;
-    private final CountDownLatch latch = new CountDownLatch(1);
-    private final BufferAllocator allocator;
-    public final List<Map<String, String>> results = Lists.newArrayList();
-    public final Set<String> columns = Sets.newLinkedHashSet();
-
-    Listener(BufferAllocator allocator) {
-      this.allocator = Preconditions.checkNotNull(allocator, "allocator cannot be null");
-    }
-
-    @Override
-    public void submissionFailed(UserException ex) {
-      exception = ex;
-      logger.error("Query Failed", ex);
-      latch.countDown();
-    }
-
-    @Override
-    public void queryCompleted(QueryState state) {
-      latch.countDown();
-    }
-
-    @Override
-    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      try {
-        final int rows = result.getHeader().getRowCount();
-        if (result.hasData()) {
-          RecordBatchLoader loader = null;
-          try {
-            loader = new RecordBatchLoader(allocator);
-            loader.load(result.getHeader().getDef(), result.getData());
-            // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-            // SchemaChangeException, so check/clean catch clause below.
-            for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
-              columns.add(loader.getSchema().getColumn(i).getPath());
-            }
-            for (int i = 0; i < rows; ++i) {
-              final Map<String, String> record = Maps.newHashMap();
-              for (VectorWrapper<?> vw : loader) {
-                final String field = vw.getValueVector().getMetadata().getNamePart().getName();
-                final ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
-                final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
-                final String display = value == null ? null : value.toString();
-                record.put(field, display);
-              }
-              results.add(record);
-            }
-          } finally {
-            if (loader != null) {
-              loader.clear();
-            }
-          }
-        }
-      } catch (SchemaChangeException e) {
-        throw new RuntimeException(e);
-      } finally {
-        result.release();
-      }
-    }
-
-    @Override
-    public void queryIdArrived(UserBitShared.QueryId queryId) {
-    }
-
-    public void waitForCompletion() throws Exception {
-      latch.await();
-      if (exception != null) {
-        throw exception;
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 685a823..b3fb692 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,9 +90,13 @@ public class WebServer implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebServer.class);
 
   private final DrillConfig config;
+
   private final MetricRegistry metrics;
+
   private final WorkManager workManager;
+
   private final Server embeddedJetty;
+
   private final BootStrapContext context;
 
   /**
@@ -115,6 +119,7 @@ public class WebServer implements AutoCloseable {
   }
 
   private static final String BASE_STATIC_PATH = "/rest/static/";
+
   private static final String DRILL_ICON_RESOURCE_RELATIVE_PATH = "img/drill.ico";
 
   /**
@@ -126,8 +131,7 @@ public class WebServer implements AutoCloseable {
       return;
     }
     final boolean authEnabled = config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
-    if (authEnabled && !context.getAuthProvider()
-        .containsFactory(PlainFactory.SIMPLE_NAME)) {
+    if (authEnabled && !context.getAuthProvider().containsFactory(PlainFactory.SIMPLE_NAME)) {
       logger.warn("Not starting web server. Currently Drill supports web authentication only through " +
           "username/password. But PLAIN mechanism is not configured.");
       return;
@@ -154,8 +158,7 @@ public class WebServer implements AutoCloseable {
     servletHolder.setInitOrder(1);
     servletContextHandler.addServlet(servletHolder, "/*");
 
-    servletContextHandler.addServlet(
-        new ServletHolder(new MetricsServlet(metrics)), "/status/metrics");
+    servletContextHandler.addServlet(new ServletHolder(new MetricsServlet(metrics)), "/status/metrics");
     servletContextHandler.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads");
 
     final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
@@ -170,8 +173,8 @@ public class WebServer implements AutoCloseable {
     servletContextHandler.addServlet(staticHolder, "/static/*");
 
     if (authEnabled) {
-        servletContextHandler.setSecurityHandler(createSecurityHandler());
-        servletContextHandler.setSessionHandler(createSessionHandler(servletContextHandler.getSecurityHandler()));
+      servletContextHandler.setSecurityHandler(createSecurityHandler());
+      servletContextHandler.setSessionHandler(createSessionHandler(servletContextHandler.getSecurityHandler()));
     }
 
     if (config.getBoolean(ExecConstants.HTTP_CORS_ENABLED)) {
@@ -185,7 +188,7 @@ public class WebServer implements AutoCloseable {
       holder.setInitParameter(CrossOriginFilter.ALLOW_CREDENTIALS_PARAM,
               String.valueOf(config.getBoolean(ExecConstants.HTTP_CORS_CREDENTIALS)));
 
-      for (String path: new String[] { "*.json", "/storage/*/enable/*", "/status*" }) {
+      for (String path : new String[]{"*.json", "/storage/*/enable/*", "/status*"}) {
         servletContextHandler.addFilter(holder, path, EnumSet.of(DispatcherType.REQUEST));
       }
     }
@@ -203,7 +206,7 @@ public class WebServer implements AutoCloseable {
     sessionManager.addEventListener(new HttpSessionListener() {
       @Override
       public void sessionCreated(HttpSessionEvent se) {
-        // No-op
+
       }
 
       @Override
@@ -219,6 +222,15 @@ public class WebServer implements AutoCloseable {
           securityHandler.logout(sessionAuth);
           session.removeAttribute(SessionAuthentication.__J_AUTHENTICATED);
         }
+
+        // Clear all the resources allocated for this session
+        final WebSessionResources webSessionResources =
+            (WebSessionResources) session.getAttribute(WebSessionResources.class.getSimpleName());
+
+        if (webSessionResources != null) {
+          webSessionResources.close();
+          session.removeAttribute(WebSessionResources.class.getSimpleName());
+        }
       }
     });
 

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
new file mode 100644
index 0000000..aeed51a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.rest;
+
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.ChannelClosedException;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import java.net.SocketAddress;
+
+/**
+ * Class holding all the resources required for Web User Session. This class is responsible for the proper cleanup of
+ * all the resources.
+ */
+public class WebSessionResources implements AutoCloseable {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResources.class);
+
+  private BufferAllocator allocator;
+
+  private final SocketAddress remoteAddress;
+
+  private UserSession webUserSession;
+
+  private ChannelPromise closeFuture;
+
+  WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress, UserSession userSession) {
+    this.allocator = allocator;
+    this.remoteAddress = remoteAddress;
+    this.webUserSession = userSession;
+    closeFuture = new DefaultChannelPromise(null);
+  }
+
+  public UserSession getSession() {
+    return webUserSession;
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public ChannelPromise getCloseFuture() {
+    return closeFuture;
+  }
+
+  public SocketAddress getRemoteAddress() {
+    return remoteAddress;
+  }
+
+  @Override
+  public void close() {
+
+    try {
+      AutoCloseables.close(webUserSession, allocator);
+    } catch (Exception ex) {
+      logger.error("Failure while closing the session resources", ex);
+    }
+
+    // Set the close future associated with this session.
+    if (closeFuture != null) {
+      closeFuture.setFailure(new ChannelClosedException("Http Session of the user is closed."));
+      closeFuture = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
new file mode 100644
index 0000000..62c6efd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import io.netty.channel.ChannelFuture;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.vector.ValueVector.Accessor;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * WebUserConnectionWrapper which represents the UserClientConnection for the WebUser submitting the query. It provides
+ * access to the UserSession executing the query. There is no actual physical channel corresponding to this connection
+ * wrapper.
+ */
+
+public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebUserConnection.class);
+
+  protected WebSessionResources webSessionResources;
+
+  public final List<Map<String, String>> results = Lists.newArrayList();
+
+  public final Set<String> columns = Sets.newLinkedHashSet();
+
+  WebUserConnection(WebSessionResources webSessionResources) {
+    this.webSessionResources = webSessionResources;
+  }
+
+  @Override
+  public UserSession getSession() {
+    return webSessionResources.getSession();
+  }
+
+  @Override
+  public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
+    // Check if there is any data or not. There can be overflow here but DrillBuf doesn't support allocating with
+    // bytes in long. Hence we are just preserving the earlier behavior and logging debug log for the case.
+    final int dataByteCount = (int) result.getByteCount();
+
+    if (dataByteCount <= 0) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Either no data received in this batch or there is BufferOverflow in dataByteCount: {}",
+            dataByteCount);
+      }
+      listener.success(Acks.OK, null);
+      return;
+    }
+
+    // If here that means there is some data for sure. Create a ByteBuf with all the data in it.
+    final int rows = result.getHeader().getRowCount();
+    final BufferAllocator allocator = webSessionResources.getAllocator();
+    final DrillBuf bufferWithData = allocator.buffer(dataByteCount);
+    try {
+      final ByteBuf[] resultDataBuffers = result.getBuffers();
+
+      for (final ByteBuf buffer : resultDataBuffers) {
+        bufferWithData.writeBytes(buffer);
+        buffer.release();
+      }
+
+      final RecordBatchLoader loader = new RecordBatchLoader(allocator);
+      try {
+        loader.load(result.getHeader().getDef(), bufferWithData);
+        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+        // SchemaChangeException, so check/clean catch clause below.
+        for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
+          columns.add(loader.getSchema().getColumn(i).getPath());
+        }
+        for (int i = 0; i < rows; ++i) {
+          final Map<String, String> record = Maps.newHashMap();
+          for (VectorWrapper<?> vw : loader) {
+            final String field = vw.getValueVector().getMetadata().getNamePart().getName();
+            final Accessor accessor = vw.getValueVector().getAccessor();
+            final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
+            final String display = value == null ? null : value.toString();
+            record.put(field, display);
+          }
+          results.add(record);
+        }
+      } finally {
+        loader.clear();
+      }
+    } catch (Exception e) {
+      exception = UserException.systemError(e).build(logger);
+    } finally {
+      // Notify the listener with ACK.OK both in error/success case because data was send successfully from Drillbit.
+      bufferWithData.release();
+      listener.success(Acks.OK, null);
+    }
+  }
+
+  @Override
+  public ChannelFuture getChannelClosureFuture() {
+    return webSessionResources.getCloseFuture();
+  }
+
+  @Override
+  public SocketAddress getRemoteAddress() {
+    return webSessionResources.getRemoteAddress();
+  }
+
+  @Override
+  public void setAutoRead(boolean enableAutoRead) {
+    // no-op
+  }
+
+  /**
+   * For authenticated WebUser no cleanup of {@link WebSessionResources} is done since it's re-used
+   * for all the queries until lifetime of the web session.
+   */
+  public void cleanupSession() {
+    // no-op
+  }
+
+  public static class AnonWebUserConnection extends WebUserConnection {
+
+    AnonWebUserConnection(WebSessionResources webSessionResources) {
+      super(webSessionResources);
+    }
+
+    /**
+     * For anonymous WebUser after each query request is completed the {@link WebSessionResources} is cleaned up.
+     */
+    @Override
+    public void cleanupSession() {
+      webSessionResources.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java
deleted file mode 100644
index 62ddca9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.server.rest.auth;
-
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.eclipse.jetty.security.DefaultIdentityService;
-import org.eclipse.jetty.security.IdentityService;
-import org.eclipse.jetty.security.LoginService;
-import org.eclipse.jetty.server.UserIdentity;
-
-import java.util.Properties;
-
-/**
- * LoginService implementation which abstracts common functionality needed when user authentication is enabled or
- * disabled.
- */
-public abstract class AbstractDrillLoginService implements LoginService {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDrillLoginService.class);
-
-  protected final DrillbitContext drillbitContext;
-  protected IdentityService identityService = new DefaultIdentityService();
-
-  public AbstractDrillLoginService(final DrillbitContext drillbitContext) {
-    this.drillbitContext = drillbitContext;
-  }
-
-  protected DrillClient createDrillClient(final String userName, final String password) throws Exception {
-    DrillClient drillClient = null;
-
-    try {
-      // Create a DrillClient
-      drillClient = new DrillClient(drillbitContext.getConfig(),
-          drillbitContext.getClusterCoordinator(), drillbitContext.getAllocator());
-      final Properties props = new Properties();
-      props.setProperty("user", userName);
-      if (password != null) {
-        props.setProperty("password", password);
-      }
-      drillClient.connect(props);
-      return  drillClient;
-    } catch (final Exception e) {
-      AutoCloseables.close(e, drillClient);
-      throw e;
-    }
-  }
-
-  @Override
-  public boolean validate(UserIdentity user) {
-    // This is called for every request after authentication is complete to make sure the user is still valid.
-    // Once a user is authenticated we assume that the user is still valid. This behavior is similar to ODBC/JDBC where
-    // once a user is logged-in we don't recheck the credentials again in the same session.
-    return true;
-  }
-
-  @Override
-  public IdentityService getIdentityService() {
-    return identityService;
-  }
-
-  @Override
-  public void setIdentityService(IdentityService identityService) {
-    this.identityService = identityService;
-  }
-
-  /**
-   * This gets called whenever a session is invalidated (because of user logout) or timed out.
-   * @param user
-   */
-  @Override
-  public void logout(UserIdentity user) {
-    final DrillUserPrincipal principal = (DrillUserPrincipal) user.getUserPrincipal();
-    try {
-      principal.close();
-    } catch (final Exception e) {
-      logger.error("Failure in logging out.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillRestLoginService.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillRestLoginService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillRestLoginService.java
index d865e94..2231ac7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillRestLoginService.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillRestLoginService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,13 +17,17 @@
  */
 package org.apache.drill.exec.server.rest.auth;
 
-import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.eclipse.jetty.security.DefaultIdentityService;
+import org.eclipse.jetty.security.IdentityService;
+import org.eclipse.jetty.security.LoginService;
 import org.eclipse.jetty.server.UserIdentity;
 
 import javax.security.auth.Subject;
@@ -33,11 +37,23 @@ import java.security.Principal;
  * LoginService used when user authentication is enabled in Drillbit. It validates the user against the user
  * authenticator set in BOOT config.
  */
-public class DrillRestLoginService extends AbstractDrillLoginService {
+public class DrillRestLoginService implements LoginService {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestLoginService.class);
 
+  private final DrillbitContext drillbitContext;
+
+  private IdentityService identityService = new DefaultIdentityService();
+
   public DrillRestLoginService(final DrillbitContext drillbitContext) {
-    super(drillbitContext);
+    this.drillbitContext = drillbitContext;
+  }
+
+  @Override
+  public boolean validate(UserIdentity user) {
+    // This is called for every request after authentication is complete to make sure the user is still valid.
+    // Once a user is authenticated we assume that the user is still valid. This behavior is similar to ODBC/JDBC where
+    // once a user is logged-in we don't recheck the credentials again in the same session.
+    return true;
   }
 
   @Override
@@ -51,18 +67,26 @@ public class DrillRestLoginService extends AbstractDrillLoginService {
       return null;
     }
 
-    DrillClient drillClient = null;
-
     try {
-      // Create a DrillClient
-      drillClient = createDrillClient(username, (String)credentials);
+      // Authenticate WebUser locally using UserAuthenticator. If WebServer is started that guarantees the PLAIN
+      // mechanism is configured and authenticator is also available
+      final AuthenticatorFactory plainFactory = drillbitContext.getAuthProvider()
+          .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
+      final UserAuthenticator userAuthenticator = ((PlainFactory) plainFactory).getAuthenticator();
+
+      // Authenticate the user with configured Authenticator
+      userAuthenticator.authenticate(username, credentials.toString());
+
+      logger.debug("WebUser {} is successfully authenticated", username);
 
       final SystemOptionManager sysOptions = drillbitContext.getOptionManager();
+
       final boolean isAdmin = ImpersonationUtil.hasAdminPrivileges(username,
           sysOptions.getOption(ExecConstants.ADMIN_USERS_KEY).string_val,
           sysOptions.getOption(ExecConstants.ADMIN_USER_GROUPS_KEY).string_val);
 
-      final Principal userPrincipal = new DrillUserPrincipal(username, isAdmin, drillClient);
+      // Create the UserPrincipal corresponding to logged in user.
+      final Principal userPrincipal = new DrillUserPrincipal(username, isAdmin);
 
       final Subject subject = new Subject();
       subject.getPrincipals().add(userPrincipal);
@@ -76,13 +100,34 @@ public class DrillRestLoginService extends AbstractDrillLoginService {
         return identityService.newUserIdentity(subject, userPrincipal, DrillUserPrincipal.NON_ADMIN_USER_ROLES);
       }
     } catch (final Exception e) {
-      AutoCloseables.close(e, drillClient);
-      if (e.getMessage().contains(HandshakeStatus.AUTH_FAILED.toString())) {
-        logger.trace("Authentication failed for user '{}'", username, e);
+      if (e instanceof UserAuthenticationException) {
+        logger.debug("Authentication failed for WebUser '{}'", username, e);
       } else {
-        logger.error("Error while creating the DrillClient: user '{}'", username, e);
+        logger.error("UnExpected failure occurred for WebUser {} during login.", username, e);
       }
       return null;
     }
   }
+
+  @Override
+  public IdentityService getIdentityService() {
+    return identityService;
+  }
+
+  @Override
+  public void setIdentityService(IdentityService identityService) {
+    this.identityService = identityService;
+  }
+
+  /**
+   * This gets called whenever a session is invalidated (because of user logout) or timed out.
+   * @param user - logged in UserIdentity
+   */
+  @Override
+  public void logout(UserIdentity user) {
+    // no-op
+    if(logger.isTraceEnabled()) {
+      logger.trace("Web user {} logged out.", user.getUserPrincipal().getName());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
index 18539ff..6d8f301 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,43 +18,37 @@
 package org.apache.drill.exec.server.rest.auth;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.eclipse.jetty.security.MappedLoginService.RolePrincipal;
 
-import java.io.IOException;
 import java.security.Principal;
 import java.util.List;
 
+
 /**
- * Captures Drill user credentials and resources in a session.
+ * Captures Drill user credentials and privilege's of the session user.
  */
-public class DrillUserPrincipal implements Principal, AutoCloseable {
+public class DrillUserPrincipal implements Principal {
   public static final String ANONYMOUS_USER = "anonymous";
 
   public static final String AUTHENTICATED_ROLE = "authenticated";
+
   public static final String ADMIN_ROLE = "admin";
 
-  public static final String[] ADMIN_USER_ROLES = new String[] { AUTHENTICATED_ROLE, ADMIN_ROLE };
-  public static final String[] NON_ADMIN_USER_ROLES = new String[] { AUTHENTICATED_ROLE };
+  public static final String[] ADMIN_USER_ROLES = new String[]{AUTHENTICATED_ROLE, ADMIN_ROLE};
 
-  public static final List<RolePrincipal> ADMIN_PRINCIPALS = ImmutableList.of(
-      new RolePrincipal(AUTHENTICATED_ROLE),
-      new RolePrincipal(ADMIN_ROLE));
+  public static final String[] NON_ADMIN_USER_ROLES = new String[]{AUTHENTICATED_ROLE};
 
-  public static final List<RolePrincipal> NON_ADMIN_PRINCIPALS =
-      ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE));
+  public static final List<RolePrincipal> ADMIN_PRINCIPALS = ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE), new RolePrincipal(ADMIN_ROLE));
 
-  protected DrillClient drillClient;
+  public static final List<RolePrincipal> NON_ADMIN_PRINCIPALS = ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE));
 
   private final String userName;
+
   private final boolean isAdmin;
 
-  public DrillUserPrincipal(final String userName, final boolean isAdmin, final DrillClient drillClient) {
+  public DrillUserPrincipal(final String userName, final boolean isAdmin) {
     this.userName = userName;
     this.isAdmin = isAdmin;
-    this.drillClient = drillClient;
   }
 
   @Override
@@ -63,24 +57,10 @@ public class DrillUserPrincipal implements Principal, AutoCloseable {
   }
 
   /**
-   * @return Return {@link DrillClient} instanced with credentials of this user principal. Returned {@link DrillClient}
-   * must be returned using {@link #recycleDrillClient(DrillClient)} for proper resource cleanup.
-   */
-  public DrillClient getDrillClient() throws IOException {
-    return drillClient;
-  }
-
-  /**
-   * Return {@link DrillClient} returned from {@link #getDrillClient()} for proper resource cleanup or reuse.
-   */
-  public void recycleDrillClient(final DrillClient client) throws IOException {
-    // default is no-op. we reuse DrillClient
-  }
-
-  /**
    * Is the user identified by this user principal can manage (read) the profile owned by the given user?
+   *
    * @param profileOwner Owner of the profile.
-   * @return
+   * @return true/false
    */
   public boolean canManageProfileOf(final String profileOwner) {
     return isAdmin || userName.equals(profileOwner);
@@ -88,49 +68,21 @@ public class DrillUserPrincipal implements Principal, AutoCloseable {
 
   /**
    * Is the user identified by this user principal can manage (cancel) the query issued by the given user?
+   *
    * @param queryUser User who launched the query.
-   * @return
+   * @return true/false
    */
   public boolean canManageQueryOf(final String queryUser) {
     return isAdmin || userName.equals(queryUser);
   }
 
-  @Override
-  public void close() throws Exception {
-    if (drillClient != null) {
-      drillClient.close();
-      drillClient = null; // Reset it to null to avoid closing multiple times.
-    }
-  }
-
   /**
    * {@link DrillUserPrincipal} for anonymous (auth disabled) mode.
    */
   public static class AnonDrillUserPrincipal extends DrillUserPrincipal {
-    private final DrillbitContext drillbitContext;
-
-    public AnonDrillUserPrincipal(final DrillbitContext drillbitContext) {
-      super(ANONYMOUS_USER, true /* in anonymous (auth disabled) mode all users are admins */, null);
-      this.drillbitContext = drillbitContext;
-    }
-
-    @Override
-    public DrillClient getDrillClient() throws IOException {
-      try {
-        // Create a DrillClient
-        drillClient = new DrillClient(drillbitContext.getConfig(),
-            drillbitContext.getClusterCoordinator(), drillbitContext.getAllocator());
-        drillClient.connect();
-        return  drillClient;
-      } catch (final Exception e) {
-        AutoCloseables.close(e, drillClient);
-        throw new IOException("Failed to create DrillClient: " + e.getMessage(), e);
-      }
-    }
 
-    @Override
-    public void recycleDrillClient(DrillClient client) throws IOException {
-      drillClient.close();
+    public AnonDrillUserPrincipal() {
+      super(ANONYMOUS_USER, true /* in anonymous (auth disabled) mode all users are admins */);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a2b09a8..5e5fef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -73,7 +73,7 @@ import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index 45b3a8d..c0d57ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -17,23 +17,9 @@
  */
 package org.apache.drill.exec.work.prepare;
 
-import static org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
-import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
-import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
-import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
-
-import java.math.BigDecimal;
-import java.net.SocketAddress;
-import java.sql.Date;
-import java.sql.ResultSetMetaData;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
 import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -45,8 +31,6 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
@@ -59,20 +43,31 @@ import org.apache.drill.exec.proto.UserProtos.RequestStatus;
 import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
 import org.apache.drill.exec.work.user.UserWorker;
 import org.joda.time.Period;
 
-import com.google.common.collect.ImmutableMap;
+import java.math.BigDecimal;
+import java.net.SocketAddress;
+import java.sql.Date;
+import java.sql.ResultSetMetaData;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
+import static org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
 
 /**
  * Contains worker {@link Runnable} for creating a prepared statement and helper methods.
@@ -230,11 +225,9 @@ public class PreparedStatementProvider {
   /**
    * Decorator around {@link UserClientConnection} to tap the query results for LIMIT 0 query.
    */
-  private static class UserClientConnectionWrapper implements UserClientConnection {
+  private static class UserClientConnectionWrapper extends AbstractDisposableUserClientConnection {
     private final UserClientConnection inner;
-    private final CountDownLatch latch = new CountDownLatch(1);
 
-    private volatile DrillPBError error;
     private volatile List<SerializedField> fields;
 
     UserClientConnectionWrapper(UserClientConnection inner) {
@@ -257,27 +250,13 @@ public class PreparedStatementProvider {
     }
 
     @Override
-    public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result) {
-      // Release the wait latch if the query is terminated.
-      final QueryState state = result.getQueryState();
-      if (state == QueryState.FAILED || state  == QueryState.CANCELED || state == QueryState.COMPLETED) {
-        if (state == QueryState.FAILED) {
-          error = result.getError(0);
-        }
-        latch.countDown();
-      }
-
-      listener.success(Acks.OK, null);
-    }
-
-    @Override
     public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
       // Save the query results schema and release the buffers.
       if (fields == null) {
         fields = result.getHeader().getDef().getFieldList();
       }
 
-      for(ByteBuf buf : result.getBuffers()) {
+      for (ByteBuf buf : result.getBuffers()) {
         buf.release();
       }
 
@@ -285,24 +264,9 @@ public class PreparedStatementProvider {
     }
 
     /**
-     * Wait until the query has completed.
-     * @throws InterruptedException
-     */
-    boolean await(final long timeoutMillis) throws InterruptedException {
-      return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * @return Any error returned in query execution.
-     */
-    DrillPBError getError() {
-      return error;
-    }
-
-    /**
      * @return Schema returned in query result batch.
      */
-    List<SerializedField> getFields() {
+    public List<SerializedField> getFields() {
       return fields;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index eb3e86c..7ffb224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -33,7 +33,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.MemoryAllocationUtilities;
 import org.apache.drill.exec.util.Pointer;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index b90b4d2..04135dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.ResponseSender;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
 import org.apache.drill.exec.server.options.OptionManager;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 5ba4526..c2a2bf0 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -126,6 +126,12 @@ drill.exec: {
       allowedMethods: ["GET", "POST", "HEAD", "OPTIONS"],
       allowedHeaders: ["X-Requested-With", "Content-Type", "Accept", "Origin"],
       credentials: true
+    },
+    session: {
+        memory: {
+            reservation: 0,
+            maximum: 9223372036854775807
+        }
     }
   },
   network: {

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index 09a61c1..3dc31ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMathFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMathFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMathFunctions.java
index 72d582c..a6c22c5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMathFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMathFunctions.java
@@ -35,7 +35,7 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.proto.BitControl;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;

http://git-wip-us.apache.org/repos/asf/drill/blob/874bf629/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java
index cf5c239..a259e8c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -49,7 +49,7 @@ public class TestMultiInputAdd extends PopUnitTestBase {
 
 
     @Test
-    public void testMultiInputAdd(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection) throws Throwable
+    public void testMultiInputAdd(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
     {
         try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
              Drillbit bit = new Drillbit(CONFIG, serviceSet);


Mime
View raw message