drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [23/29] drill git commit: DRILL-4280: CORE (bit to bit authentication, control)
Date Sat, 25 Feb 2017 07:18:16 GMT
DRILL-4280: CORE (bit to bit authentication, control)

+ Support authentication in ControlServer and ControlClient
+ Add AuthenticationCommand as an initial command after handshake
  and before the command that initiates a connection
+ Add ControlConnectionConfig to encapsulate configuration
+ ControlMessageHandler now implements RequestHandler

control


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/180dd564
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/180dd564
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/180dd564

Branch: refs/heads/master
Commit: 180dd5648ab604b6396d91ba69a2f777f19bf79c
Parents: b6e59ec
Author: Sudheesh Katkam <sudheesh@apache.org>
Authored: Wed Jan 25 19:03:52 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Fri Feb 24 19:01:43 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   1 +
 .../exec/rpc/AbstractServerConnection.java      |   2 +-
 .../drill/exec/rpc/BitConnectionConfig.java     |  98 +++++++++++
 .../rpc/control/ConnectionManagerRegistry.java  |  38 ++---
 .../drill/exec/rpc/control/ControlClient.java   | 165 +++++++++++++++----
 .../exec/rpc/control/ControlConnection.java     |  80 +++++----
 .../rpc/control/ControlConnectionConfig.java    |  48 ++++++
 .../rpc/control/ControlConnectionManager.java   |  43 ++---
 .../exec/rpc/control/ControlRpcConfig.java      |   4 +-
 .../drill/exec/rpc/control/ControlServer.java   |  63 +++----
 .../drill/exec/rpc/control/ControlTunnel.java   |   9 +-
 .../drill/exec/rpc/control/Controller.java      |   3 +-
 .../drill/exec/rpc/control/ControllerImpl.java  |  29 ++--
 .../rpc/control/DefaultInstanceHandler.java     |   3 +
 .../org/apache/drill/exec/work/WorkManager.java |   3 +-
 .../exec/work/batch/ControlMessageHandler.java  |  37 +++--
 .../work/fragment/NonRootFragmentManager.java   |   2 +-
 .../src/main/resources/drill-module.conf        |   3 +-
 18 files changed, 448 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index b8f0c23..460702a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -114,6 +114,7 @@ public interface ExecConstants {
   String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
   String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
   String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
+  String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
index 72af064..db87bfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
@@ -87,7 +87,7 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
   }
 
   @Override
-  public void finalizeSession() throws IOException {
+  public void finalizeSaslSession() throws IOException {
     final String authorizationID = getSaslServer().getAuthorizationID();
     final String remoteShortName = new HadoopKerberosName(authorizationID).getShortName();
     final String localShortName = UserGroupInformation.getLoginUser().getShortUserName();

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
new file mode 100644
index 0000000..71e5a86
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.common.KerberosUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.security.AuthStringUtil;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+// config for bit to bit connection
+public abstract class BitConnectionConfig extends AbstractConnectionConfig {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionConfig.class);
+
+  private final String authMechanismToUse;
+  private final boolean useLoginPrincipal;
+
+  protected BitConnectionConfig(BufferAllocator allocator, BootStrapContext context) throws DrillbitStartupException {
+    super(allocator, context);
+
+    final DrillConfig config = context.getConfig();
+    if (config.getBoolean(ExecConstants.BIT_AUTHENTICATION_ENABLED)) {
+      this.authMechanismToUse = config.getString(ExecConstants.BIT_AUTHENTICATION_MECHANISM);
+      try {
+        getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+      } catch (final SaslException e) {
+        throw new DrillbitStartupException(String.format(
+            "'%s' mechanism not found for bit-to-bit authentication. Please check authentication configuration.",
+            authMechanismToUse));
+      }
+      logger.info("Configured bit-to-bit connections to require authentication using: {}", authMechanismToUse);
+    } else {
+      this.authMechanismToUse = null;
+    }
+    this.useLoginPrincipal = config.getBoolean(ExecConstants.USE_LOGIN_PRINCIPAL);
+  }
+
+  // returns null iff auth is disabled
+  public String getAuthMechanismToUse() {
+    return authMechanismToUse;
+  }
+
+  // convenience method
+  public AuthenticatorFactory getAuthFactory(final List<String> remoteMechanisms) throws SaslException {
+    if (authMechanismToUse == null) {
+      throw new SaslException("Authentication is not enabled");
+    }
+    if (!AuthStringUtil.listContains(remoteMechanisms, authMechanismToUse)) {
+      throw new SaslException(String.format("Remote does not support authentication using '%s'", authMechanismToUse));
+    }
+    return getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+  }
+
+  public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint) throws IOException {
+    final DrillProperties properties = DrillProperties.createEmpty();
+
+    final UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS) {
+      final HadoopKerberosName loginPrincipal = new HadoopKerberosName(loginUser.getUserName());
+      if (!useLoginPrincipal) {
+        properties.setProperty(DrillProperties.SERVICE_PRINCIPAL,
+            KerberosUtil.getPrincipalFromParts(loginPrincipal.getShortName(),
+                remoteEndpoint.getAddress(),
+                loginPrincipal.getRealm()));
+      } else {
+        properties.setProperty(DrillProperties.SERVICE_PRINCIPAL, loginPrincipal.toString());
+      }
+    }
+    return properties.stringPropertiesAsMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 1ac30e7..800cf3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,10 +20,7 @@ package org.apache.drill.exec.rpc.control;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 import com.google.common.collect.Maps;
 
@@ -32,24 +29,21 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
 
   private final ConcurrentMap<DrillbitEndpoint, ControlConnectionManager> registry = Maps.newConcurrentMap();
 
-  private final ControlMessageHandler handler;
-  private final BootStrapContext context;
-  private volatile DrillbitEndpoint localEndpoint;
-  private final BufferAllocator allocator;
+  private final ControlConnectionConfig config;
 
-  public ConnectionManagerRegistry(BufferAllocator allocator, ControlMessageHandler handler, BootStrapContext context) {
-    super();
-    this.handler = handler;
-    this.context = context;
-    this.allocator = allocator;
+  private DrillbitEndpoint localEndpoint;
+
+  public ConnectionManagerRegistry(ControlConnectionConfig config) {
+    this.config = config;
   }
 
-  public ControlConnectionManager getConnectionManager(DrillbitEndpoint endpoint) {
-    assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
-    ControlConnectionManager m = registry.get(endpoint);
+  public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) {
+    assert localEndpoint != null :
+        "DrillbitEndpoint must be set before a connection manager can be retrieved";
+    ControlConnectionManager m = registry.get(remoteEndpoint);
     if (m == null) {
-      m = new ControlConnectionManager(allocator, endpoint, localEndpoint, handler, context);
-      ControlConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+      m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint);
+      final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m);
       if (m2 != null) {
         m = m2;
       }
@@ -58,13 +52,13 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
     return m;
   }
 
+  void setLocalEndpoint(final DrillbitEndpoint endpoint) {
+    this.localEndpoint = endpoint;
+  }
+
   @Override
   public Iterator<ControlConnectionManager> iterator() {
     return registry.values().iterator();
   }
 
-  public void setEndpoint(DrillbitEndpoint endpoint) {
-    this.localEndpoint = endpoint;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index c5bf6b5..6ebe1c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
@@ -27,54 +28,54 @@ import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcCommand;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.FailingRequestHandler;
 
 import com.google.protobuf.MessageLite;
+import org.apache.hadoop.security.UserGroupInformation;
 
-public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
 
-  private final ControlMessageHandler handler;
   private final DrillbitEndpoint remoteEndpoint;
   private volatile ControlConnection connection;
   private final ControlConnectionManager.CloseHandlerCreator closeHandlerFactory;
-  private final DrillbitEndpoint localIdentity;
-  private final BufferAllocator allocator;
-
-  public ControlClient(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint,
-      ControlMessageHandler handler,
-      BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        allocator.getAsByteBufAllocator(),
-        context.getBitLoopGroup(),
+  private final ControlConnectionConfig config;
+
+  public ControlClient(ControlConnectionConfig config, DrillbitEndpoint remoteEndpoint,
+                       ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
+    super(ControlRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+        config.getBootstrapContext().getExecutor()),
+        config.getAllocator().getAsByteBufAllocator(),
+        config.getBootstrapContext().getBitLoopGroup(),
         RpcType.HANDSHAKE,
         BitControlHandshake.class,
         BitControlHandshake.PARSER);
-    this.localIdentity = localEndpoint;
+    this.config = config;
     this.remoteEndpoint = remoteEndpoint;
-    this.handler = handler;
     this.closeHandlerFactory = closeHandlerFactory;
-    this.allocator = context.getAllocator();
-  }
-
-  public void connect(RpcConnectionHandler<ControlConnection> connectionHandler) {
-    connectAsClient(connectionHandler, BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort());
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public ControlConnection initRemoteConnection(SocketChannel channel) {
+  protected ControlConnection initRemoteConnection(SocketChannel channel) {
     super.initRemoteConnection(channel);
-    this.connection = new ControlConnection("control client", channel,
-        (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+    connection = new ControlConnection(channel, "control client", config,
+        config.getAuthMechanismToUse() == null
+            ? config.getMessageHandler()
+            : new FailingRequestHandler<ControlConnection>(),
+        this);
     return connection;
   }
 
@@ -89,14 +90,36 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   }
 
   @Override
-  protected Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handler.handle(connection, rpcType, pBody, dBody);
+  protected void handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+                        ResponseSender sender) throws RpcException {
+    connection.getCurrentHandler().handle(connection, rpcType, pBody, dBody, sender);
   }
 
   @Override
   protected void validateHandshake(BitControlHandshake handshake) throws RpcException {
     if (handshake.getRpcVersion() != ControlRpcConfig.RPC_VERSION) {
-      throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+      throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
+          handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+    }
+
+    if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
+      final SaslClient saslClient;
+      try {
+        saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
+            .createSaslClient(UserGroupInformation.getLoginUser(),
+                config.getSaslClientProperties(remoteEndpoint));
+      } catch (final IOException e) {
+        throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
+      }
+      if (saslClient == null) {
+        throw new RpcException("Unexpected failure. Could not initiate SASL exchange.");
+      }
+      connection.setSaslClient(saslClient);
+    } else {
+      if (config.getAuthMechanismToUse() != null) { // local requires authentication
+        throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
+                remoteEndpoint.getAddress()));
+      }
     }
   }
 
@@ -105,8 +128,84 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
     connection.setEndpoint(handshake.getEndpoint());
   }
 
-  public ControlConnection getConnection() {
-    return this.connection;
+  @Override
+  protected <M extends MessageLite> RpcCommand<M, ControlConnection>
+  getInitialCommand(final RpcCommand<M, ControlConnection> command) {
+    final RpcCommand<M, ControlConnection> initialCommand = super.getInitialCommand(command);
+    if (config.getAuthMechanismToUse() == null) {
+      return initialCommand;
+    } else {
+      return new AuthenticationCommand<>(initialCommand);
+    }
+  }
+
+  private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, ControlConnection> {
+
+    private final RpcCommand<M, ControlConnection> command;
+
+    AuthenticationCommand(RpcCommand<M, ControlConnection> command) {
+      this.command = command;
+    }
+
+    @Override
+    public void connectionAvailable(ControlConnection connection) {
+      command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here."));
+    }
+
+    @Override
+    public void connectionSucceeded(final ControlConnection connection) {
+      final UserGroupInformation loginUser;
+      try {
+        loginUser = UserGroupInformation.getLoginUser();
+      } catch (final IOException e) {
+        logger.debug("Unexpected failure trying to login.", e);
+        command.connectionFailed(FailureType.AUTHENTICATION, e);
+        return;
+      }
+
+      final SettableFuture<Void> future = SettableFuture.create();
+      new AuthenticationOutcomeListener<>(ControlClient.this, connection, RpcType.SASL_MESSAGE,
+          loginUser,
+          new RpcOutcomeListener<Void>() {
+            @Override
+            public void failed(RpcException ex) {
+              logger.debug("Authentication failed.", ex);
+              future.setException(ex);
+            }
+
+            @Override
+            public void success(Void value, ByteBuf buffer) {
+              connection.changeHandlerTo(config.getMessageHandler());
+              future.set(null);
+            }
+
+            @Override
+            public void interrupted(InterruptedException e) {
+              logger.debug("Authentication failed.", e);
+              future.setException(e);
+            }
+          }).initiate(config.getAuthMechanismToUse());
+
+
+      try {
+        logger.trace("Waiting until authentication completes..");
+        future.get();
+        command.connectionSucceeded(connection);
+      } catch (InterruptedException e) {
+        command.connectionFailed(FailureType.AUTHENTICATION, e);
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        command.connectionFailed(FailureType.AUTHENTICATION, e);
+      }
+    }
+
+    @Override
+    public void connectionFailed(FailureType type, Throwable t) {
+      logger.debug("Authentication failed.", t);
+      command.connectionFailed(FailureType.AUTHENTICATION, t);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index 179a2f4..a50a3b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -17,35 +17,40 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.google.protobuf.MessageLite;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.socket.SocketChannel;
-
-import java.util.UUID;
-
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.AbstractServerConnection;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RequestHandler;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.slf4j.Logger;
 
-import com.google.protobuf.MessageLite;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkState;
 
-public class ControlConnection extends RemoteConnection {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnection.class);
+public class ControlConnection extends AbstractServerConnection<ControlConnection> implements ClientConnection {
+  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnection.class);
 
   private final RpcBus<RpcType, ControlConnection> bus;
-  private final BufferAllocator allocator;
+  private final UUID id;
+
   private volatile DrillbitEndpoint endpoint;
   private volatile boolean active = false;
-  private final UUID id;
 
-  public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
-      BufferAllocator allocator) {
-    super(channel, name);
+  private SaslClient saslClient;
+
+  ControlConnection(SocketChannel channel, String name, ControlConnectionConfig config,
+                    RequestHandler<ControlConnection> handler, RpcBus<RpcType, ControlConnection> bus) {
+    super(channel, name, config, handler);
     this.bus = bus;
     this.id = UUID.randomUUID();
-    this.allocator = allocator;
   }
 
   void setEndpoint(DrillbitEndpoint endpoint) {
@@ -54,24 +59,18 @@ public class ControlConnection extends RemoteConnection {
     active = true;
   }
 
-  protected DrillbitEndpoint getEndpoint() {
-    return endpoint;
-  }
-
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener,
-      RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody,
+            Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
   }
 
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener,
-      RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  public <SEND extends MessageLite, RECEIVE extends MessageLite>
+  void sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody,
+                  Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     bus.send(outcomeListener, this, rpcType, protobufBody, clazz, true, dataBodies);
   }
 
-  public void disable() {
-    active = false;
-  }
-
   @Override
   public boolean isActive() {
     return active;
@@ -108,8 +107,33 @@ public class ControlConnection extends RemoteConnection {
   }
 
   @Override
-  public BufferAllocator getAllocator() {
-    return allocator;
+  protected Logger getLogger() {
+    return logger;
+  }
+
+  @Override
+  public void setSaslClient(final SaslClient saslClient) {
+    checkState(this.saslClient == null);
+    this.saslClient = saslClient;
+  }
+
+  @Override
+  public SaslClient getSaslClient() {
+    checkState(saslClient != null);
+    return saslClient;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (saslClient != null) {
+        saslClient.dispose();
+        saslClient = null;
+      }
+    } catch (final SaslException e) {
+      getLogger().warn("Unclean disposal", e);
+    }
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
new file mode 100644
index 0000000..b19fb8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.BitConnectionConfig;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+
+// config for bit to bit connection
+// package private
+class ControlConnectionConfig extends BitConnectionConfig {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class);
+
+  private final ControlMessageHandler handler;
+
+  ControlConnectionConfig(BufferAllocator allocator, BootStrapContext context, ControlMessageHandler handler)
+      throws DrillbitStartupException {
+    super(allocator, context);
+    this.handler = handler;
+  }
+
+  @Override
+  public String getName() {
+    return "control"; // unused
+  }
+
+  ControlMessageHandler getMessageHandler() {
+    return handler;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 611b727..b31ffa7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -17,13 +17,10 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.BasicClient;
 import org.apache.drill.exec.rpc.ReconnectingConnection;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 /**
  * Maintains connection between two particular bits.
@@ -31,34 +28,26 @@ import org.apache.drill.exec.work.batch.ControlMessageHandler;
 public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
 
-  private final DrillbitEndpoint endpoint;
-  private final ControlMessageHandler handler;
-  private final BootStrapContext context;
-  private final DrillbitEndpoint localIdentity;
-  private final BufferAllocator allocator;
-
-  public ControlConnectionManager(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint,
-      DrillbitEndpoint localIdentity, ControlMessageHandler handler, BootStrapContext context) {
-    super(BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort());
-    assert remoteEndpoint != null : "Endpoint cannot be null.";
-    assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
-    assert remoteEndpoint.getControlPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k.  Was set to %d.", remoteEndpoint.getControlPort());
-
-    this.allocator = allocator;
-    this.endpoint = remoteEndpoint;
-    this.localIdentity = localIdentity;
-    this.handler = handler;
-    this.context = context;
+  private final ControlConnectionConfig config;
+  private final DrillbitEndpoint remoteEndpoint;
+
+  public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint,
+                                  DrillbitEndpoint remoteEndpoint) {
+    super(
+        BitControlHandshake.newBuilder()
+            .setRpcVersion(ControlRpcConfig.RPC_VERSION)
+            .setEndpoint(localEndpoint)
+            .build(),
+        remoteEndpoint.getAddress(),
+        remoteEndpoint.getControlPort());
+
+    this.config = config;
+    this.remoteEndpoint = remoteEndpoint;
   }
 
   @Override
   protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() {
-    return new ControlClient(allocator, endpoint, localIdentity, handler, context, new CloseHandlerCreator());
-  }
-
-
-  public DrillbitEndpoint getEndpoint() {
-    return endpoint;
+    return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index ec09a98..562cd3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcConfig;
@@ -52,10 +53,11 @@ public class ControlRpcConfig {
         .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
         .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class)
+        .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, SaslMessage.class)
         .build();
   }
 
-  public static int RPC_VERSION = 3;
+  public static final int RPC_VERSION = 3;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
   public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index a786469..9e733df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -28,29 +27,25 @@ import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
 
 import com.google.protobuf.MessageLite;
 
 public class ControlServer extends BasicServer<RpcType, ControlConnection>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
 
-  private final ControlMessageHandler handler;
+  private final ControlConnectionConfig config;
   private final ConnectionManagerRegistry connectionRegistry;
   private volatile ProxyCloseHandler proxyCloseHandler;
-  private BufferAllocator allocator;
-
-  public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
-    super(
-        ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getAsByteBufAllocator(),
-        context.getBitLoopGroup());
-    this.handler = handler;
+
+  public ControlServer(ControlConnectionConfig config, ConnectionManagerRegistry connectionRegistry) {
+    super(ControlRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+        config.getBootstrapContext().getExecutor()),
+        config.getAllocator().getAsByteBufAllocator(),
+        config.getBootstrapContext().getBitLoopGroup());
+    this.config = config;
     this.connectionRegistry = connectionRegistry;
-    this.allocator = context.getAllocator();
   }
 
   @Override
@@ -59,20 +54,20 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   }
 
   @Override
-  protected Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handler.handle(connection, rpcType, pBody, dBody);
-  }
-
-  @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
     this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
     return proxyCloseHandler;
   }
 
   @Override
-  public ControlConnection initRemoteConnection(SocketChannel channel) {
+  protected ControlConnection initRemoteConnection(SocketChannel channel) {
     super.initRemoteConnection(channel);
-    return new ControlConnection("control server", channel, this, allocator);
+    return new ControlConnection(channel, "control server", config,
+        config.getAuthMechanismToUse() == null
+            ? config.getMessageHandler()
+            : new ServerAuthenticationHandler<>(config.getMessageHandler(),
+            RpcType.SASL_MESSAGE_VALUE, RpcType.SASL_MESSAGE),
+        this);
   }
 
 
@@ -84,10 +79,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
       public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception {
 //        logger.debug("Handling handshake from other bit. {}", inbound);
         if (inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) {
-          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
+              inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
         }
-        if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) {
-          throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.", inbound.getEndpoint()));
+        if (!inbound.hasEndpoint() ||
+            inbound.getEndpoint().getAddress().isEmpty() ||
+            inbound.getEndpoint().getControlPort() < 1) {
+          throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.",
+                  inbound.getEndpoint()));
         }
         connection.setEndpoint(inbound.getEndpoint());
 
@@ -95,19 +94,25 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
         ControlConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
 
         // update the close handler.
-        proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+        proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection,
+            proxyCloseHandler.getHandler()));
 
         // add to the connection manager.
         manager.addExternalConnection(connection);
 
-        return BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).build();
+        final BitControlHandshake.Builder builder = BitControlHandshake.newBuilder();
+        builder.setRpcVersion(ControlRpcConfig.RPC_VERSION);
+        if (config.getAuthMechanismToUse() != null) {
+          builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
+        }
+        return builder.build();
       }
 
     };
   }
 
   @Override
-  public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+  protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
     return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 9b46a7a..bb0fda3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -57,15 +56,9 @@ public class ControlTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
 
   private final ControlConnectionManager manager;
-  private final DrillbitEndpoint endpoint;
 
-  public ControlTunnel(DrillbitEndpoint endpoint, ControlConnectionManager manager) {
+  public ControlTunnel(ControlConnectionManager manager) {
     this.manager = manager;
-    this.endpoint = endpoint;
-  }
-
-  public DrillbitEndpoint getEndpoint(){
-    return manager.getEndpoint();
   }
 
   public void sendFragments(RpcOutcomeListener<Ack> outcomeListener, InitializeFragments fragments){

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index a5f470c..6b2ee4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -44,7 +44,8 @@ public interface Controller extends AutoCloseable {
    */
   public ControlTunnel getTunnel(DrillbitEndpoint node) ;
 
-  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting)
+      throws DrillbitStartupException;
 
   /**
    * Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 482f117..4991816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,39 +36,34 @@ import com.google.protobuf.Parser;
  * Manages communication tunnels between nodes.
  */
 public class ControllerImpl implements Controller {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControllerImpl.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControllerImpl.class);
 
   private volatile ControlServer server;
-  private final ControlMessageHandler handler;
-  private final BootStrapContext context;
   private final ConnectionManagerRegistry connectionRegistry;
-  private final boolean allowPortHunting;
   private final CustomHandlerRegistry handlerRegistry;
+  private final ControlConnectionConfig config;
 
-  public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, BufferAllocator allocator,
-      boolean allowPortHunting) {
-    super();
-    this.handler = handler;
-    this.context = context;
-    this.connectionRegistry = new ConnectionManagerRegistry(allocator, handler, context);
-    this.allowPortHunting = allowPortHunting;
+  public ControllerImpl(BootStrapContext context, BufferAllocator allocator, ControlMessageHandler handler)
+      throws DrillbitStartupException {
+    config = new ControlConnectionConfig(allocator, context, handler);
+    this.connectionRegistry = new ConnectionManagerRegistry(config);
     this.handlerRegistry = handler.getHandlerRegistry();
   }
 
   @Override
-  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
-    server = new ControlServer(handler, context, connectionRegistry);
-    int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, final boolean allowPortHunting) {
+    server = new ControlServer(config, connectionRegistry);
+    int port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
     port = server.bind(port, allowPortHunting);
     DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
-    connectionRegistry.setEndpoint(completeEndpoint);
+    connectionRegistry.setLocalEndpoint(completeEndpoint);
     handlerRegistry.setEndpoint(completeEndpoint);
     return completeEndpoint;
   }
 
   @Override
   public ControlTunnel getTunnel(DrillbitEndpoint endpoint) {
-    return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
+    return new ControlTunnel(connectionRegistry.getConnectionManager(endpoint));
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
index 7065201..5360cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
 import org.apache.drill.exec.rpc.RpcException;
 
 import com.google.protobuf.MessageLite;
@@ -49,6 +50,8 @@ public class DefaultInstanceHandler {
       return QueryProfile.getDefaultInstance();
     case RpcType.RESP_CUSTOM_VALUE:
       return CustomMessage.getDefaultInstance();
+    case RpcType.SASL_MESSAGE_VALUE:
+      return SaslMessage.getDefaultInstance();
     default:
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 697616e..c352861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -260,7 +260,8 @@ public class WorkManager implements AutoCloseable {
 
     /**
      * Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
-     * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataServer#handle}
+     * called, when the first batch arrives.
+     *
      * @param fragmentManager the manager for the fragment
      */
     public void startFragmentPendingRemote(final FragmentManager fragmentManager) {

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 77c069b..58c1df5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,7 +34,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RequestHandler;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
@@ -50,7 +52,7 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 
-public class ControlMessageHandler {
+public class ControlMessageHandler implements RequestHandler<ControlConnection> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
   private final WorkerBee bee;
   private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry();
@@ -59,8 +61,9 @@ public class ControlMessageHandler {
     this.bee = bee;
   }
 
-  public Response handle(final ControlConnection connection, final int rpcType,
-      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+  @Override
+  public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+                     ResponseSender sender) throws RpcException {
     if (RpcConstants.EXTRA_DEBUGGING) {
       logger.debug("Received bit com message of type {}", rpcType);
     }
@@ -70,34 +73,39 @@ public class ControlMessageHandler {
     case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
       final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
       cancelFragment(handle);
-      return ControlRpcConfig.OK;
+      sender.send(ControlRpcConfig.OK);
+      break;
     }
 
     case RpcType.REQ_CUSTOM_VALUE: {
       final CustomMessage customMessage = get(pBody, CustomMessage.PARSER);
-      return handlerRegistry.handle(customMessage, (DrillBuf) dBody);
+      sender.send(handlerRegistry.handle(customMessage, (DrillBuf) dBody));
+      break;
     }
 
     case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
       final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
       receivingFragmentFinished(finishedReceiver);
-      return ControlRpcConfig.OK;
+      sender.send(ControlRpcConfig.OK);
+      break;
     }
 
     case RpcType.REQ_FRAGMENT_STATUS_VALUE:
       bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
       // TODO: Support a type of message that has no response.
-      return ControlRpcConfig.OK;
+      sender.send(ControlRpcConfig.OK);
+      break;
 
     case RpcType.REQ_QUERY_CANCEL_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
       final Foreman foreman = bee.getForemanForQueryId(queryId);
       if (foreman != null) {
         foreman.cancel();
-        return ControlRpcConfig.OK;
+        sender.send(ControlRpcConfig.OK);
       } else {
-        return ControlRpcConfig.FAIL;
+        sender.send(ControlRpcConfig.FAIL);
       }
+      break;
     }
 
     case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
@@ -105,7 +113,8 @@ public class ControlMessageHandler {
       for(int i = 0; i < fragments.getFragmentCount(); i++) {
         startNewRemoteFragment(fragments.getFragment(i));
       }
-      return ControlRpcConfig.OK;
+      sender.send(ControlRpcConfig.OK);
+      break;
     }
 
     case RpcType.REQ_QUERY_STATUS_VALUE: {
@@ -115,13 +124,15 @@ public class ControlMessageHandler {
         throw new RpcException("Query not running on node.");
       }
       final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
-      return new Response(RpcType.RESP_QUERY_STATUS, profile);
+      sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile));
+      break;
     }
 
     case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
       final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
       resumeFragment(handle);
-      return ControlRpcConfig.OK;
+      sender.send(ControlRpcConfig.OK);
+      break;
     }
 
     default:

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 3e7a693..7cffa0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -62,7 +62,7 @@ public class NonRootFragmentManager implements FragmentManager {
   }
 
   /* (non-Javadoc)
-   * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
+   * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.AbstractRemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
    */
   @Override
   public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {

http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index ecf6f6a..9c2ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -149,7 +149,8 @@ drill.exec: {
     enabled: false
   },
   security.bit.auth {
-    enabled : false
+    enabled: false
+    use_login_principal: false
   }
   trace: {
     directory: "/tmp/drill-trace",


Mime
View raw message