drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [2/6] drill git commit: DRILL-4335: Apache Drill should support network encryption.
Date Sat, 20 May 2017 23:52:43 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
new file mode 100644
index 0000000..b9dd705
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user.security;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.NonTransientRpcException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.security.auth.Subject;
+import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import static junit.framework.TestCase.fail;
+
+@Ignore("See DRILL-5387")
+public class TestUserBitKerberosEncryption extends BaseTestQuery {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(TestUserBitKerberosEncryption.class);
+
+  private static KerberosHelper krbHelper;
+  private static DrillConfig newConfig;
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    krbHelper = new KerberosHelper(TestUserBitKerberosEncryption.class.getSimpleName());
+    krbHelper.setupKdc();
+
+    // Create a new DrillConfig which has user authentication enabled and authenticator set to
+    // UserAuthenticatorTestImpl.
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+            ConfigValueFactory.fromAnyRef(true)),
+      false);
+
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    // Ignore the compile time warning caused by the code below.
+
+    // Config is statically initialized at this point. But the above configuration results in a different
+    // initialization which causes the tests to fail. So the following two changes are required.
+
+    // (1) Refresh Kerberos config.
+    sun.security.krb5.Config.refresh();
+    // (2) Reset the default realm.
+    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
+    defaultRealm.setAccessible(true);
+    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
+
+    // Start a secure cluster with client using Kerberos related parameters.
+    updateTestCluster(1, newConfig, connectionProps);
+  }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    krbHelper.stopKdc();
+  }
+
+  @Test
+  public void successKeytabWithoutChunking() throws Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+    updateClient(connectionProps);
+
+    // Run few queries using the new client
+    testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json`");
+  }
+
+  @Test
+  public void successTicketWithoutChunking() throws Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
+    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
+                                                               krbHelper.clientKeytab.getAbsoluteFile());
+
+    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        updateClient(connectionProps);
+        return null;
+      }
+    });
+
+    // Run few queries using the new client
+    testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json` LIMIT 5");
+  }
+
+  @Test
+  public void successKeytabWithChunking() throws Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
+        ConfigValueFactory.fromAnyRef(100))
+      ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run few queries using the new client
+    testBuilder()
+      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+      .unOrdered()
+      .baselineColumns("session_user")
+      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+      .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json`");
+  }
+
+  @Test
+  public void successKeytabWithChunkingDefaultChunkSize() throws Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run few queries using the new client
+    testBuilder()
+      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+      .unOrdered()
+      .baselineColumns("session_user")
+      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+      .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json` LIMIT 5");
+  }
+
+
+  /**
+   *  This test will not cover the data channel since we are using only 1 Drillbit and the query doesn't involve
+   *  any exchange operator. But Data Channel encryption testing is covered separately in
+   *  {@link org.apache.drill.exec.rpc.data.TestBitBitKerberos}
+   */
+  @Test
+  public void successEncryptionAllChannelChunkMode() throws Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
+        ConfigValueFactory.fromAnyRef(10000))
+      .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+        ConfigValueFactory.fromAnyRef("kerberos"))
+      .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
+        ConfigValueFactory.fromAnyRef(10000))
+      ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run few queries using the new client
+    testBuilder()
+      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+      .unOrdered()
+      .baselineColumns("session_user")
+      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+      .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json` LIMIT 5");
+  }
+
+
+
+  /**
+   *  This test will not cover the data channel since we are using only 1 Drillbit and the query doesn't involve
+   *  any exchange operator. But Data Channel encryption testing is covered separately in
+   *  {@link org.apache.drill.exec.rpc.data.TestBitBitKerberos}
+   */
+  @Test
+  public void successEncryptionAllChannel() throws Exception {
+
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+        ConfigValueFactory.fromAnyRef("kerberos"))
+      .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run few queries using the new client
+    testBuilder()
+      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+      .unOrdered()
+      .baselineColumns("session_user")
+      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+      .go();
+    test("SHOW SCHEMAS");
+    test("USE INFORMATION_SCHEMA");
+    test("SHOW TABLES");
+    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    test("SELECT * FROM cp.`region.json` LIMIT 5");
+  }
+
+  @Test
+  public void failurePlainMech() {
+    try {
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.USER, "anonymous");
+      connectionProps.setProperty(DrillProperties.PASSWORD, "anything works!");
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+          ConfigValueFactory.fromAnyRef(true)),
+        false);
+
+      updateTestCluster(1, newConfig, connectionProps);
+      fail();
+    } catch (Exception ex) {
+      assert (ex.getCause() instanceof NonTransientRpcException);
+      System.out.println("Caught exception: " + ex.getMessage());
+      logger.info("Caught exception: " + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void encryptionEnabledWithOnlyPlainMech() {
+    try {
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
+        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+          ConfigValueFactory.fromAnyRef(true)),
+        false);
+      updateTestCluster(1, newConfig, connectionProps);
+
+      fail();
+    } catch (Exception ex) {
+      assert (ex.getCause() instanceof NonTransientRpcException);
+      System.out.println("Caught exception: " + ex.getMessage());
+      logger.info("Caught exception: " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Test to validate that older clients are not allowed to connect to secure cluster
+   * with encryption enabled.
+   */
+  @Test
+  public void failureOldClientEncryptionEnabled() {
+    try {
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+      connectionProps.setProperty(DrillProperties.TEST_SASL_LEVEL, "1");
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+          ConfigValueFactory.fromAnyRef(true)),
+        false);
+      updateTestCluster(1, newConfig, connectionProps);
+
+      fail();
+    } catch (Exception ex) {
+      assert (ex.getCause() instanceof RpcException);
+      System.out.println("Caught exception: " + ex.getMessage());
+      logger.info("Caught exception: " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Test to validate that older clients are successfully connecting to secure cluster
+   * with encryption disabled.
+   */
+  @Test
+  public void successOldClientEncryptionDisabled() {
+
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+    connectionProps.setProperty(DrillProperties.TEST_SASL_LEVEL, "1");
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))),
+      false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+  }
+
+  /**
+   * Test to validate that clients which needs encrypted connection fails to connect
+   * to server with encryption disabled.
+   */
+  @Test
+  public void clientNeedsEncryptionWithNoServerSupport() throws Exception {
+    try {
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+      connectionProps.setProperty(DrillProperties.SASL_ENCRYPT, "true");
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        , false);
+
+      updateTestCluster(1, newConfig, connectionProps);
+
+      fail();
+    } catch (Exception ex) {
+      assert (ex.getCause() instanceof NonTransientRpcException);
+    }
+  }
+
+  /**
+   * Test to validate that clients which needs encrypted connection connects
+   * to server with encryption enabled.
+   */
+  @Test
+  public void clientNeedsEncryptionWithServerSupport() throws Exception {
+    try {
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+      connectionProps.setProperty(DrillProperties.SASL_ENCRYPT, "true");
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+        , false);
+
+      updateTestCluster(1, newConfig, connectionProps);
+    } catch (Exception ex) {
+      fail();
+      assert (ex.getCause() instanceof NonTransientRpcException);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
index 59ed85b..face1af 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
@@ -20,27 +20,35 @@ package org.apache.drill.exec.rpc;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 import java.net.SocketAddress;
+import java.nio.ByteOrder;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
-public abstract class AbstractRemoteConnection implements RemoteConnection {
+public abstract class AbstractRemoteConnection implements RemoteConnection, EncryptionContext {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRemoteConnection.class);
 
   private final Channel channel;
   private final WriteManager writeManager;
   private final RequestIdMap requestIdMap = new RequestIdMap();
   private final String clientName;
-
   private String name;
 
-  public AbstractRemoteConnection(SocketChannel channel, String name) {
+  // Encryption related parameters
+  private final EncryptionContext encryptionContext;
+  // SaslCodec to hold instance of SaslClient/SaslServer
+  protected SaslCodec saslCodec;
+
+  public AbstractRemoteConnection(SocketChannel channel, String name, EncryptionContext encryptionContext) {
     this.channel = channel;
     this.clientName = name;
     this.writeManager = new WriteManager();
+    this.encryptionContext = new EncryptionContextImpl(encryptionContext);
     channel.pipeline().addLast(new BackPressureHandler());
   }
 
@@ -224,4 +232,98 @@ public abstract class AbstractRemoteConnection implements RemoteConnection {
     }
   }
 
+  /**
+   * Helps to add all the required security handler's after negotiation for encryption is completed.
+   * <p>Handler's before encryption is negotiated are:</p>
+   * <ul>
+   *    <li>  PROTOCOL_DECODER {@link ProtobufLengthDecoder} </li>
+   *    <li>  MESSAGE_DECODER {@link RpcDecoder}  </li>
+   *    <li>  PROTOCOL_ENCODER {@link RpcEncoder} </li>
+   *    <li>  HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
+   *                            {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler}  </li>
+   *    <li>  optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
+   *                   - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler}  </li>
+   *    <li>  MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler} </li>
+   *    <li>  EXCEPTION_HANDLER {@link RpcExceptionHandler} </li>
+   * </ul>
+   * <p>Handler's after encryption is negotiated are:</p>
+   * <ul>
+   *    <li>  LENGTH_DECODER_HANDLER {@link LengthFieldBasedFrameDecoder}
+   *    <li>  SASL_DECRYPTION_HANDLER {@link SaslDecryptionHandler}
+   *    <li>  PROTOCOL_DECODER {@link ProtobufLengthDecoder}
+   *    <li>  MESSAGE_DECODER {@link RpcDecoder}
+   *    <li>  SASL_ENCRYPTION_HANDLER {@link SaslEncryptionHandler}
+   *    <li>  CHUNK_CREATION_HANDLER {@link ChunkCreationHandler}
+   *    <li>  PROTOCOL_ENCODER {@link RpcEncoder}
+   *    <li>  HANDSHAKE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.ClientHandshakeHandler} OR
+   *                            {@link org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler}
+   *    <li>  optional - IDLE_STATE_HANDLER {@link org.apache.drill.exec.rpc.BasicClient.IdlePingHandler} OR
+   *                   - TIMEOUT_HANDLER {@link org.apache.drill.exec.rpc.BasicServer.LoggingReadTimeoutHandler}
+   *    <li>  MESSAGE_HANDLER {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler}
+   *    <li>  EXCEPTION_HANDLER {@link RpcExceptionHandler}
+   * </ul>
+   * <p>
+   *  If encryption is enabled ChunkCreationHandler is always added to divide the Rpc message into chunks of
+   *  negotiated {@link EncryptionContextImpl#wrapSizeLimit} bytes. This helps to make a generic encryption handler.
+   * </p>
+   */
+  @Override
+  public void addSecurityHandlers() {
+
+    final ChannelPipeline channelPipeline = getChannel().pipeline();
+    channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER,
+        new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
+
+    channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER,
+        new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
+            RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
+            RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
+
+    channelPipeline.addAfter(RpcConstants.MESSAGE_DECODER, RpcConstants.SASL_ENCRYPTION_HANDLER,
+        new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(),
+            OutOfMemoryHandler.DEFAULT_INSTANCE));
+
+    channelPipeline.addAfter(RpcConstants.SASL_ENCRYPTION_HANDLER, RpcConstants.CHUNK_CREATION_HANDLER,
+        new ChunkCreationHandler(getWrapSizeLimit()));
+  }
+
+  public abstract void incConnectionCounter();
+
+  public abstract void decConnectionCounter();
+
+  @Override
+  public void setEncryption(boolean encrypted) {
+    encryptionContext.setEncryption(encrypted);
+  }
+
+  @Override
+  public boolean isEncryptionEnabled() {
+    return encryptionContext.isEncryptionEnabled();
+  }
+
+  @Override
+  public String getEncryptionCtxtString() {
+    return encryptionContext.toString();
+  }
+
+  @Override
+  public void setMaxWrappedSize(int maxWrappedChunkSize) {
+    encryptionContext.setMaxWrappedSize(maxWrappedChunkSize);
+  }
+
+  @Override
+  public int getMaxWrappedSize() {
+    return encryptionContext.getMaxWrappedSize();
+  }
+
+  @Override
+  public void setWrapSizeLimit(int wrapSizeLimit) {
+    encryptionContext.setWrapSizeLimit(wrapSizeLimit);
+  }
+
+  @Override
+  public int getWrapSizeLimit() {
+    return encryptionContext.getWrapSizeLimit();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index e418c5a..d51b748 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -101,17 +101,17 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
 
             final ChannelPipeline pipe = ch.pipeline();
 
-            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
-            pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
-            pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
-            pipe.addLast("handshake-handler", new ClientHandshakeHandler(connection));
+            pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
+            pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
+            pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("c-" + rpcConfig.getName()));
+            pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, new ClientHandshakeHandler(connection));
 
             if(pingHandler != null){
-              pipe.addLast("idle-state-handler", pingHandler);
+              pipe.addLast(RpcConstants.IDLE_STATE_HANDLER, pingHandler);
             }
 
-            pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler<CC>(connection));
+            pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
+            pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<CC>(connection));
           }
         }); //
 
@@ -323,5 +323,4 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
       connection.close();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 7437b65..a7258dd 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -53,8 +53,6 @@ import com.google.protobuf.Parser;
 public abstract class BasicServer<T extends EnumLite, SC extends ServerConnection<SC>> extends RpcBus<T, SC> {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
-  protected static final String TIMEOUT_HANDLER = "timeout-handler";
-
   private final ServerBootstrap b;
   private volatile boolean connect = false;
   private final EventLoopGroup eventLoopGroup;
@@ -84,18 +82,18 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio
             ch.closeFuture().addListener(getCloseHandler(ch, connection));
 
             final ChannelPipeline pipe = ch.pipeline();
-            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
-            pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
-            pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
-            pipe.addLast("handshake-handler", getHandshakeHandler(connection));
+            pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
+            pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("s-" + rpcConfig.getName()));
+            pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("s-" + rpcConfig.getName()));
+            pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, getHandshakeHandler(connection));
 
             if (rpcMapping.hasTimeout()) {
-              pipe.addLast(TIMEOUT_HANDLER,
+              pipe.addLast(RpcConstants.TIMEOUT_HANDLER,
                   new LoggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
             }
 
-            pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler<>(connection));
+            pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
+            pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
 
             connect = true;
 //            logger.debug("Server connection initialization completed.");

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ChunkCreationHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ChunkCreationHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ChunkCreationHandler.java
new file mode 100644
index 0000000..b0c1ae0
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ChunkCreationHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
+
+/**
+ * Handler that converts an input ByteBuf into chunk size ByteBuf's and add it to the
+ * CompositeByteBuf as individual components. If encryption is enabled, this is always
+ * added in the channel pipeline.
+ */
+class ChunkCreationHandler extends MessageToMessageEncoder<ByteBuf> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
+      ChunkCreationHandler.class.getCanonicalName());
+
+  private final int chunkSize;
+
+  ChunkCreationHandler(int chunkSize) {
+    checkArgument(chunkSize > 0);
+    this.chunkSize = chunkSize;
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    super.handlerAdded(ctx);
+    logger.trace("Added " + RpcConstants.CHUNK_CREATION_HANDLER + " handler!");
+  }
+
+  @Override
+  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+    super.handlerRemoved(ctx);
+    logger.trace("Removed " + RpcConstants.CHUNK_CREATION_HANDLER + " handler");
+  }
+
+  @Override
+  protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("ChunkCreationHandler called with msg {} of size {} with chunkSize {}",
+          msg, msg.readableBytes(), chunkSize);
+    }
+
+    if (!ctx.channel().isOpen()) {
+      logger.debug("Channel closed, skipping encode inside {}.", RpcConstants.CHUNK_CREATION_HANDLER);
+      msg.release();
+      return;
+    }
+
+    // Calculate the number of chunks based on configured chunk size and input msg size
+    int numChunks = (int) Math.ceil((double) msg.readableBytes() / chunkSize);
+
+    // Initialize a composite buffer to hold numChunks chunk.
+    final CompositeByteBuf cbb = ctx.alloc().compositeBuffer(numChunks);
+
+    int cbbWriteIndex = 0;
+    int currentChunkLen = min(msg.readableBytes(), chunkSize);
+
+    // Create slices of chunkSize from input msg and add it to the composite buffer.
+    while (numChunks > 0) {
+      final ByteBuf chunkBuf = msg.slice(msg.readerIndex(), currentChunkLen);
+      chunkBuf.retain();
+      cbb.addComponent(chunkBuf);
+      cbbWriteIndex += currentChunkLen;
+      msg.skipBytes(currentChunkLen);
+      --numChunks;
+      currentChunkLen = min(msg.readableBytes(), chunkSize);
+    }
+
+    // Update the writerIndex of composite byte buffer. Netty doesn't do it automatically.
+    cbb.writerIndex(cbbWriteIndex);
+
+    // Add the final composite bytebuf into output buffer.
+    out.add(cbb);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
index 5393173..15e5cf8 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ClientConnection.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import javax.security.sasl.SaslClient;
 
-public interface ClientConnection extends RemoteConnection {
+public interface ClientConnection extends RemoteConnection, EncryptionContext {
 
   // set only once
   void setSaslClient(SaslClient saslClient);
@@ -27,4 +27,7 @@ public interface ClientConnection extends RemoteConnection {
   // get only after setting
   SaslClient getSaslClient();
 
+  // dispose the saslClient object
+  void disposeSaslClient();
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContext.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContext.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContext.java
new file mode 100644
index 0000000..dd9acdd
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.rpc;
+
+public interface EncryptionContext {
+
+  boolean isEncryptionEnabled();
+
+  void setEncryption(boolean encryptionEnabled);
+
+  void setMaxWrappedSize(int maxWrappedChunkSize);
+
+  int getMaxWrappedSize();
+
+  void setWrapSizeLimit(int wrapSizeLimit);
+
+  int getWrapSizeLimit();
+
+  String getEncryptionCtxtString();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContextImpl.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContextImpl.java
new file mode 100644
index 0000000..4710823
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/EncryptionContextImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+/**
+ * Context to help initializing encryption related configurations for a connection.
+ * <ul>
+ * <li>encryptionEnabled  - identifies if encryption is required or not </li>
+ * <li>maxWrappedSize     - maximum size of the encoded packet that is sent over wire.
+ *                          Recommended Maximum value is {@link RpcConstants#MAX_RECOMMENDED_WRAPPED_SIZE}</li>
+ * <li>wrapSizeLimit      - Maximum size of plain buffer to be send to wrap call which will produce encrypted buffer
+ *                          <= maxWrappedSize. Get's set after SASL negotiation.</li>
+ *</ul>
+ */
+public class EncryptionContextImpl implements EncryptionContext {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EncryptionContextImpl.class);
+
+  private boolean encryptionEnabled;
+
+  private int maxWrappedSize;
+
+  private int wrapSizeLimit;
+
+  EncryptionContextImpl() {
+    this.encryptionEnabled = false;
+    this.maxWrappedSize = 65536;
+    this.wrapSizeLimit = 0;
+  }
+
+  EncryptionContextImpl(EncryptionContext context) {
+    this.encryptionEnabled = context.isEncryptionEnabled();
+    this.maxWrappedSize = context.getMaxWrappedSize();
+    this.wrapSizeLimit = context.getWrapSizeLimit();
+  }
+
+  @Override
+  public boolean isEncryptionEnabled() {
+    return encryptionEnabled;
+  }
+
+  @Override
+  public void setEncryption(boolean encryptionEnabled) {
+    this.encryptionEnabled = encryptionEnabled;
+  }
+
+  @Override
+  public int getMaxWrappedSize() {
+    return maxWrappedSize;
+  }
+
+  @Override
+  public void setMaxWrappedSize(int maxWrappedSize) {
+    this.maxWrappedSize = maxWrappedSize;
+  }
+
+  @Override
+  public String getEncryptionCtxtString() {
+    return toString();
+  }
+
+  @Override
+  public void setWrapSizeLimit(int wrapSizeLimit) {
+    this.wrapSizeLimit = wrapSizeLimit;
+  }
+
+  @Override
+  public int getWrapSizeLimit() {
+    return wrapSizeLimit;
+  }
+
+  private String getEncryptionString() {
+    return encryptionEnabled ? "enabled" : "disabled";
+  }
+
+  @Override
+  public String toString() {
+    return ("Encryption: " + getEncryptionString() + " , MaxWrappedSize: " + maxWrappedSize + " , " +
+        "WrapSizeLimit: " + wrapSizeLimit).intern();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index ffb7429..d5ddc00 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -51,6 +51,8 @@ public interface RemoteConnection extends ConnectionThrottle, AutoCloseable {
 
   SocketAddress getRemoteAddress();
 
+  void addSecurityHandlers();
+
   @Override
   void close();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
index 4be365c..be58f37 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,4 +24,29 @@ public class RpcConstants {
 
   public static final boolean SOME_DEBUGGING = false;
   public static final boolean EXTRA_DEBUGGING = false;
+
+  // RPC Handler names
+  public static final String TIMEOUT_HANDLER = "timeout-handler";
+  public static final String PROTOCOL_DECODER = "protocol-decoder";
+  public static final String PROTOCOL_ENCODER = "protocol-encoder";
+  public static final String MESSAGE_DECODER = "message-decoder";
+  public static final String HANDSHAKE_HANDLER = "handshake-handler";
+  public static final String MESSAGE_HANDLER = "message-handler";
+  public static final String EXCEPTION_HANDLER = "exception-handler";
+  public static final String IDLE_STATE_HANDLER = "idle-state-handler";
+  public static final String SASL_DECRYPTION_HANDLER = "sasl-decryption-handler";
+  public static final String SASL_ENCRYPTION_HANDLER = "sasl-encryption-handler";
+  public static final String LENGTH_DECODER_HANDLER = "length-decoder";
+  public static final String CHUNK_CREATION_HANDLER = "chunk-creation-handler";
+
+
+
+  // GSSAPI RFC 2222 allows only 3 octets to specify the length of maximum encoded buffer each side can receive.
+  // Hence the recommended maximum buffer size is kept as 16Mb i.e. 0XFFFFFF bytes.
+  public static final int MAX_RECOMMENDED_WRAPPED_SIZE = 0XFFFFFF;
+
+  public static final int LENGTH_FIELD_OFFSET = 0;
+  public static final int LENGTH_FIELD_LENGTH = 4;
+  public static final int LENGTH_ADJUSTMENT = 0;
+  public static final int INITIAL_BYTES_TO_STRIP = 0;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index f9da6f1..19097bd 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -111,7 +111,7 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
         cos.writeRawVarint32(rawBodyLength);
         cos.flush(); // need to flush so that dbody goes after if cos is caching.
 
-        CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1);
+        final CompositeByteBuf cbb = ctx.alloc().compositeBuffer(msg.dBodies.length + 1);
         cbb.addComponent(buf);
         int bufLength = buf.readableBytes();
         for (ByteBuf b : msg.dBodies) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcMetrics.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcMetrics.java
new file mode 100644
index 0000000..a737095
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcMetrics.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.rpc;
+
+/**
+ * Holder interface for all the metrics used in RPC layer
+ */
+public interface RpcMetrics {
+
+  void addConnectionCount();
+
+  void decConnectionCount();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslCodec.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslCodec.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslCodec.java
new file mode 100644
index 0000000..582b91c
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslCodec.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.rpc;
+
+import javax.security.sasl.SaslException;
+
+/*
+ * Codec interface which helps to bind wrap/unwrap call in security handlers to corresponding calls on SaslClient
+ * or SaslServer instances.
+ */
+public interface SaslCodec {
+
+  /** Encrypt data. */
+  byte[] wrap(byte[] data, int offset, int len) throws SaslException;
+
+  /** Decrypt data. */
+  byte[] unwrap(byte[] data, int offset, int len) throws SaslException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslDecryptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslDecryptionHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslDecryptionHandler.java
new file mode 100644
index 0000000..52faf51
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslDecryptionHandler.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+/**
+ * Handler to Decrypt the input ByteBuf. It expects input to be in format where it has length of the bytes to
+ * decode in network order and actual encrypted bytes. The handler reads the length and then reads the
+ * required bytes to pass it to unwrap function for decryption. The decrypted buffer is copied to a new
+ * ByteBuf and added to out list.
+ * <p>
+ * Example:
+ * <li>Input - [EBLN1, EB1, EBLN2, EB2] --> ByteBuf with repeated combination of encrypted byte length
+ *             in network order (EBLNx) and encrypted bytes (EB)
+ * <li>Output - [DB1] --> Decrypted ByteBuf of first chunk.(EB1)
+ * </p>
+ */
+class SaslDecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
+      SaslDecryptionHandler.class.getCanonicalName());
+
+  private final SaslCodec saslCodec;
+
+  private final int maxWrappedSize;
+
+  private final OutOfMemoryHandler outOfMemoryHandler;
+
+  private final byte[] encodedMsg;
+
+  private final ByteBuffer lengthOctets;
+
+  SaslDecryptionHandler(SaslCodec saslCodec, int maxWrappedSize, OutOfMemoryHandler oomHandler) {
+    this.saslCodec = saslCodec;
+    this.outOfMemoryHandler = oomHandler;
+    this.maxWrappedSize = maxWrappedSize;
+
+    // Allocate the byte array of maxWrappedSize to reuse for each encoded packet received on this connection.
+    // Size of this buffer depends upon the configuration encryption.sasl.max_wrapped_size
+    encodedMsg = new byte[maxWrappedSize];
+    lengthOctets = ByteBuffer.allocate(RpcConstants.LENGTH_FIELD_LENGTH);
+    lengthOctets.order(ByteOrder.BIG_ENDIAN);
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    super.handlerAdded(ctx);
+    logger.trace("Added " + RpcConstants.SASL_DECRYPTION_HANDLER + " handler");
+  }
+
+  @Override
+  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+    super.handlerRemoved(ctx);
+    logger.trace("Removed " + RpcConstants.SASL_DECRYPTION_HANDLER + " handler");
+  }
+
+  public void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws IOException {
+
+    if (!ctx.channel().isOpen()) {
+      logger.trace("Channel closed before decoding the message of {} bytes", msg.readableBytes());
+      msg.skipBytes(msg.readableBytes());
+      return;
+    }
+
+    try {
+      if(logger.isTraceEnabled()) {
+        logger.trace("Trying to decrypt the encrypted message of size: {} with maxWrappedSize", msg.readableBytes());
+      }
+
+
+      // All the encrypted blocks are prefixed with it's length in network byte order (or BigEndian format). Netty's
+      // default Byte order of ByteBuf is Little Endian, so we cannot just do msg.getInt() as that will read the 4
+      // octets in little endian format.
+      //
+      // We will read the length of one complete encrypted chunk and decode that.
+      msg.getBytes(msg.readerIndex(), lengthOctets.array(), 0, RpcConstants.LENGTH_FIELD_LENGTH);
+      final int wrappedMsgLength = lengthOctets.getInt(0);
+      msg.skipBytes(RpcConstants.LENGTH_FIELD_LENGTH);
+
+      // Since lengthBasedFrameDecoder will ensure we have enough bytes it's good to have this check here.
+      assert(msg.readableBytes() == wrappedMsgLength);
+
+      // Uncomment the below code if msg can contain both of Direct and Heap ByteBuf. Currently Drill only supports
+      // DirectByteBuf so the below condition will always be false. If the msg are always HeapByteBuf then in
+      // addition also remove the allocation of encodedMsg from constructor.
+      /*if (msg.hasArray()) {
+        wrappedMsg = msg.array();
+      } else {
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("The input bytebuf is not backed by a byte array so allocating a new one");
+      }*/
+
+      // Check if the wrappedMsgLength doesn't exceed agreed upon maxWrappedSize. As per SASL RFC 2222/4422 we
+      // should close the connection since it represents a security attack.
+      if (wrappedMsgLength > maxWrappedSize) {
+        throw new RpcException(String.format("Received encoded buffer size: %d is larger than negotiated " +
+            "maxWrappedSize: %d. Closing the connection as this is unexpected.", wrappedMsgLength, maxWrappedSize));
+      }
+
+      final byte[] wrappedMsg = encodedMsg;
+      // Copy the wrappedMsgLength of bytes into the byte array
+      msg.getBytes(msg.readerIndex(), wrappedMsg, 0, wrappedMsgLength);
+      //}
+
+      // SASL library always copies the origMsg internally to a new byte array
+      // and return another new byte array after decrypting the message. The memory for this
+      // will be Garbage collected by JVM since SASL Library releases it's reference after
+      // returning the byte array.
+      final byte[] decodedMsg = saslCodec.unwrap(wrappedMsg, 0, wrappedMsgLength);
+
+      if(logger.isTraceEnabled()) {
+        logger.trace("Successfully decrypted incoming message. Length after decryption: {}", decodedMsg.length);
+      }
+
+      // Update the msg reader index since we have decrypted this chunk
+      msg.skipBytes(wrappedMsgLength);
+
+      // Allocate a new Bytebuf to copy the decrypted chunk.
+      final ByteBuf decodedMsgBuf = ctx.alloc().buffer(decodedMsg.length);
+      decodedMsgBuf.writeBytes(decodedMsg);
+
+      // Add the decrypted chunk to output buffer for next handler to take care of it.
+      out.add(decodedMsgBuf);
+
+    } catch (OutOfMemoryException e) {
+      logger.warn("Failure allocating buffer on incoming stream due to memory limits.");
+      msg.resetReaderIndex();
+      outOfMemoryHandler.handle();
+    } catch (IOException e) {
+      logger.error("Something went wrong while unwrapping the message: {} with MaxEncodeSize: {} and " +
+          "error: {}", msg, maxWrappedSize, e.getMessage());
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslEncryptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslEncryptionHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslEncryptionHandler.java
new file mode 100644
index 0000000..10755c3
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/SaslEncryptionHandler.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+
+/**
+ * Handler to wrap the input Composite ByteBuf components separately and append the encrypted length for each
+ * component in the output ByteBuf. If there are multiple components in the input ByteBuf then each component will be
+ * encrypted individually and added to output ByteBuf with it's length prepended.
+ * <p>
+ * Example:
+ * <li>Input ByteBuf  --> [B1,B2] - 2 component ByteBuf of 16K byte each.
+ * <li>Output ByteBuf --> [[EBLN1, EB1], [EBLN2, EB2]] - List of ByteBuf's with each ByteBuf containing
+ *                    Encrypted Byte Length (EBLNx) in network order as per SASL RFC and Encrypted Bytes (EBx).
+ * </p>
+ */
+class SaslEncryptionHandler extends MessageToMessageEncoder<ByteBuf> {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
+      SaslEncryptionHandler.class.getCanonicalName());
+
+  private final SaslCodec saslCodec;
+
+  private final int wrapSizeLimit;
+
+  private byte[] origMsgBuffer;
+
+  private final ByteBuffer lengthOctets;
+
+  private final OutOfMemoryHandler outOfMemoryHandler;
+
+  /**
+   * We don't provide preference to allocator to use heap buffer instead of direct buffer.
+   * Drill uses it's own buffer allocator which doesn't support heap buffer allocation. We use
+   * Drill buffer allocator in the channel.
+   */
+  SaslEncryptionHandler(SaslCodec saslCodec, final int wrapSizeLimit, final OutOfMemoryHandler oomHandler) {
+    this.saslCodec = saslCodec;
+    this.wrapSizeLimit = wrapSizeLimit;
+    this.outOfMemoryHandler = oomHandler;
+
+    // The maximum size of the component will be wrapSizeLimit. Since this is maximum size, we can allocate once
+    // and reuse it for each component encode.
+    origMsgBuffer = new byte[this.wrapSizeLimit];
+    lengthOctets = ByteBuffer.allocate(RpcConstants.LENGTH_FIELD_LENGTH);
+    lengthOctets.order(ByteOrder.BIG_ENDIAN);
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    super.handlerAdded(ctx);
+    logger.trace("Added " + RpcConstants.SASL_ENCRYPTION_HANDLER + " handler!");
+  }
+
+  @Override
+  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+    super.handlerRemoved(ctx);
+    logger.trace("Removed " + RpcConstants.SASL_ENCRYPTION_HANDLER + " handler");
+  }
+
+  public void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws IOException {
+
+    if (!ctx.channel().isOpen()) {
+      logger.debug("In " + RpcConstants.SASL_ENCRYPTION_HANDLER + " and channel is not open. " +
+          "So releasing msg memory before encryption.");
+      msg.release();
+      return;
+    }
+
+    try {
+      // If encryption is enabled then this handler will always get ByteBuf of type Composite ByteBuf
+      assert(msg instanceof CompositeByteBuf);
+
+      final CompositeByteBuf cbb = (CompositeByteBuf) msg;
+      final int numComponents = cbb.numComponents();
+
+      // Get all the components inside the Composite ByteBuf for encryption
+      for(int currentIndex = 0; currentIndex < numComponents; ++currentIndex) {
+        final ByteBuf component = cbb.component(currentIndex);
+
+        // Each component ByteBuf size should not be greater than wrapSizeLimit since ChunkCreationHandler
+        // will break the RPC message into chunks of wrapSizeLimit.
+        if (component.readableBytes() > wrapSizeLimit) {
+          throw new RpcException(String.format("Component Chunk size: %d is greater than the wrapSizeLimit: %d",
+              component.readableBytes(), wrapSizeLimit));
+        }
+
+        // Uncomment the below code if msg can contain both of Direct and Heap ByteBuf. Currently Drill only supports
+        // DirectByteBuf so the below condition will always be false. If the msg are always HeapByteBuf then in
+        // addition also remove the allocation of origMsgBuffer from constructor.
+        /*if (component.hasArray()) {
+          origMsg = component.array();
+        } else {
+
+        if (RpcConstants.EXTRA_DEBUGGING) {
+          logger.trace("The input bytebuf is not backed by a byte array so allocating a new one");
+        }*/
+        final byte[] origMsg = origMsgBuffer;
+        component.getBytes(component.readerIndex(), origMsg, 0, component.readableBytes());
+        //}
+
+        if(logger.isTraceEnabled()) {
+          logger.trace("Trying to encrypt chunk of size:{} with wrapSizeLimit:{} and chunkMode: {}",
+              component.readableBytes(), wrapSizeLimit);
+        }
+
+        // Length to encrypt will be component length not origMsg length since that can be greater.
+        final byte[] wrappedMsg = saslCodec.wrap(origMsg, 0, component.readableBytes());
+
+        if(logger.isTraceEnabled()) {
+          logger.trace("Successfully encrypted message, original size: {} Final Size: {}",
+              component.readableBytes(), wrappedMsg.length);
+        }
+
+        // Allocate the buffer (directByteBuff) for copying the encrypted byte array and 4 octets for length of the
+        // encrypted message. This is preferred since later on if the passed buffer is not in direct memory then it
+        // will be copied by the channel into a temporary direct memory which will be cached to the thread. The size
+        // of that temporary direct memory will be size of largest message send.
+        final ByteBuf encryptedBuf = ctx.alloc().buffer(wrappedMsg.length + RpcConstants.LENGTH_FIELD_LENGTH);
+
+        // Based on SASL RFC 2222/4422 we should have starting 4 octet as the length of the encrypted buffer in network
+        // byte order. SASL framework provided by JDK doesn't do that by default and leaves it upto application. Whereas
+        // Cyrus SASL implementation of sasl_encode does take care of this.
+        lengthOctets.putInt(wrappedMsg.length);
+        encryptedBuf.writeBytes(lengthOctets.array());
+
+        // reset the position for re-use in next round
+        lengthOctets.rewind();
+
+        // Write the encrypted bytes inside the buffer
+        encryptedBuf.writeBytes(wrappedMsg);
+
+        // Update the msg and component reader index
+        msg.skipBytes(component.readableBytes());
+        component.skipBytes(component.readableBytes());
+
+        // Add the encrypted buffer into the output to send it on wire.
+        out.add(encryptedBuf);
+      }
+    } catch (OutOfMemoryException e) {
+      logger.warn("Failure allocating buffer on incoming stream due to memory limits.");
+      msg.resetReaderIndex();
+      outOfMemoryHandler.handle();
+    } catch (IOException e) {
+      logger.error("Something went wrong while wrapping the message: {} with MaxRawWrapSize: {}, ChunkMode: {} " +
+          "and error: {}", msg, wrapSizeLimit, e.getMessage());
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
index 3cdfdb2..6f878ef 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ServerConnection.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.rpc;
 import javax.security.sasl.SaslServer;
 import java.io.IOException;
 
-public interface ServerConnection<S extends ServerConnection<S>> extends RemoteConnection {
+public interface ServerConnection<S extends ServerConnection<S>> extends RemoteConnection, EncryptionContext {
 
   // init only once
   void initSaslServer(String mechanismName) throws IOException;
@@ -34,4 +34,6 @@ public interface ServerConnection<S extends ServerConnection<S>> extends RemoteC
 
   void changeHandlerTo(RequestHandler<S> handler);
 
+  void disposeSaslServer();
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a7ed29..071f086 100644
--- a/pom.xml
+++ b/pom.xml
@@ -229,6 +229,8 @@
             <exclude>**/cmake_install.cmake</exclude>
             <exclude>**/*.tbl</exclude>
             <exclude>**/*.httpd</exclude>
+            <exclude>**/*.autotools</exclude>
+            <exclude>**/*.cproject</exclude>
             <!-- TODO DRILL-4336: try to avoid the need to add this -->
             <exclude>dependency-reduced-pom.xml</exclude>
           </excludes>

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/protocol/readme.txt
----------------------------------------------------------------------
diff --git a/protocol/readme.txt b/protocol/readme.txt
index 6f502c4..9fdaf19 100644
--- a/protocol/readme.txt
+++ b/protocol/readme.txt
@@ -4,10 +4,24 @@ The java sources are generated into src/main/java and checked in.
 
 To regenerate the sources after making changes to .proto files
 ---------------------------------------------------------------
-1. Ensure that the protobuf 'protoc' tool (version 2.5 or newer) is
+1. Ensure that the protobuf 'protoc' tool (version 2.5 or newer (but 2.x series)) is
 in your PATH (you may need to download and build it first). You can 
 download it from http://code.google.com/p/protobuf/downloads/list.
 
+    Note: If generating sources on MAC follow below instructions:
+
+              a) Download and install "brew"
+                 Command: /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
+
+              b) Download and install "protoc"
+                 Command: brew install protobuf250  --- installs protobuf for version 2.5.0
+                          brew install protobuf     --- installs latest protobuf version
+
+              c) Check the version of "protoc"
+                 Command: protoc --version
+
+              d) Follow steps 2 and 3 below
+
 2. In protocol dir, run "mvn process-sources -P proto-compile" or "mvn clean install -P proto-compile".
 
 3. Check in the new/updated files.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index 4bb2de7..b6c2bf4 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -997,6 +997,10 @@ public final class SchemaUserProtos
                     output.writeString(7, authenticationMechanisms, true);
                 for(org.apache.drill.exec.proto.UserProtos.RpcType supportedMethods : message.getSupportedMethodsList())
                     output.writeEnum(8, supportedMethods.getNumber(), true);
+                if(message.hasEncrypted())
+                    output.writeBool(9, message.getEncrypted(), false);
+                if(message.hasMaxWrappedSize())
+                    output.writeInt32(10, message.getMaxWrappedSize(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.BitToUserHandshake message)
             {
@@ -1058,6 +1062,12 @@ public final class SchemaUserProtos
                         case 8:
                             builder.addSupportedMethods(org.apache.drill.exec.proto.UserProtos.RpcType.valueOf(input.readEnum()));
                             break;
+                        case 9:
+                            builder.setEncrypted(input.readBool());
+                            break;
+                        case 10:
+                            builder.setMaxWrappedSize(input.readInt32());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -1105,6 +1115,8 @@ public final class SchemaUserProtos
                 case 6: return "serverInfos";
                 case 7: return "authenticationMechanisms";
                 case 8: return "supportedMethods";
+                case 9: return "encrypted";
+                case 10: return "maxWrappedSize";
                 default: return null;
             }
         }
@@ -1123,6 +1135,8 @@ public final class SchemaUserProtos
             fieldMap.put("serverInfos", 6);
             fieldMap.put("authenticationMechanisms", 7);
             fieldMap.put("supportedMethods", 8);
+            fieldMap.put("encrypted", 9);
+            fieldMap.put("maxWrappedSize", 10);
         }
     }
 


Mime
View raw message