kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [6/6] kudu git commit: [java-client]: support for Kerberized RPC
Date Mon, 28 Nov 2016 19:57:51 GMT
[java-client]: support for Kerberized RPC

This commit adds initial support for connecting to a Kerberized cluster
with the Java client. The application is required to have an active
login context which contains a subject with Kerberos credentials when
creating the KuduClient to connect to a secured cluster. For example:

```java
// Create a login context ensuring that Kerberos credentials are set, and login.
LoginContext context = new LoginContext(...);
context.login();

// Create a new Kudu client in the privileged context.
KuduClient client = Subject.doAs(context.getSubject(), new PrivilegedAction<KuduClient>()
{
    @Override
    public KuduClient run() {
        new KuduClient.KuduClientBuilder(...).build();
    }
});
```

Once the KuduClient is created, the login context is no longer
necessary. This commit does not add a configuration option to ensure the
client rejects connecting to an insecure cluster, that may come in a
follow-up commit.

Testing:

One test is included in TestMiniKuduCluster which explicitly enables
Kerberos authentication and tests that the client can connect. I also
manually switched BaseKuduTest to use a Kerberized cluster, and verified
that tests in TestKuduClient and TestKuduTable which didn't create their
own (uncredentialed) clients passed. I'll leave it to a follow up commit
to figure out how to automate running the full test suite against a
secure cluster.

Change-Id: I5131edb1f2bda443f7980a4aad86362666b3b6f5
Reviewed-on: http://gerrit.cloudera.org:8080/5150
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dad80bd5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dad80bd5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dad80bd5

Branch: refs/heads/master
Commit: dad80bd504bd7065c93721846df70086345ab68a
Parents: 8d29d10
Author: Dan Burkert <danburkert@apache.org>
Authored: Fri Nov 4 11:56:06 2016 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Mon Nov 28 19:57:18 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  21 +++
 .../java/org/apache/kudu/client/KuduClient.java |   1 -
 .../org/apache/kudu/client/SecureRpcHelper.java |  84 ++++-----
 .../org/apache/kudu/client/TabletClient.java    |  12 +-
 .../java/org/apache/kudu/client/MiniKdc.java    |  52 +++---
 .../org/apache/kudu/client/MiniKuduCluster.java | 180 +++++++++++++++----
 .../org/apache/kudu/client/TestMiniKdc.java     |   4 +-
 .../apache/kudu/client/TestMiniKuduCluster.java | 104 ++++++-----
 .../java/org/apache/kudu/client/TestUtils.java  |  26 +++
 java/kudu-client/src/test/resources/flags       |   2 +-
 .../integration-tests/external_mini_cluster.h   |   2 +-
 src/kudu/rpc/sasl_server.cc                     |   8 +-
 12 files changed, 314 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 9c28458..0de1fb8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,10 +27,13 @@
 package org.apache.kudu.client;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,6 +47,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import javax.annotation.concurrent.GuardedBy;
+import javax.security.auth.Subject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -180,6 +184,8 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final RequestTracker requestTracker;
 
+  private final Subject subject;
+
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
@@ -195,6 +201,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     String clientId = UUID.randomUUID().toString().replace("-", "");
     this.requestTracker = new RequestTracker(clientId);
+    this.subject = b.subject;
     this.connectionCache = new ConnectionCache(this);
   }
 
@@ -938,6 +945,17 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Gets the subject who created the Kudu client.
+   *
+   * The subject contains credentials necessary to authenticate to Kerberized Kudu clusters.
+   *
+   * @return the subject who created the Kudu client, or null if no login context was active.
+   */
+  Subject getSubject() {
+    return subject;
+  }
+
+  /**
    * Clears {@link #tableLocations} of the table's entries.
    *
    * This method makes the maps momentarily inconsistent, and should only be
@@ -1623,6 +1641,7 @@ public class AsyncKuduClient implements AutoCloseable {
     private int bossCount = DEFAULT_BOSS_COUNT;
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
+    private Subject subject;
 
     /**
      * Creates a new builder for a client that will connect to the specified masters.
@@ -1779,6 +1798,8 @@ public class AsyncKuduClient implements AutoCloseable {
      * @return a new asynchronous Kudu client
      */
     public AsyncKuduClient build() {
+      AccessControlContext context = AccessController.getContext();
+      subject = Subject.getSubject(context);
       return new AsyncKuduClient(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 067fcc4..141acad 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -410,6 +410,5 @@ public class KuduClient implements AutoCloseable {
       AsyncKuduClient client = clientBuilder.build();
       return new KuduClient(client);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
index 269b256..34ef397 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
@@ -26,23 +26,23 @@
 
 package org.apache.kudu.client;
 
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
+import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
+import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.protobuf.ByteString;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -57,12 +57,17 @@ import org.apache.kudu.rpc.RpcHeader;
 @InterfaceAudience.Private
 public class SecureRpcHelper {
 
-  public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+
+  private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
+  private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
+  private static final String[] MECHS = new String[] { "GSSAPI", "PLAIN" };
+  private static final String[] INSECURE_MECHS = new String[] { "PLAIN" };
+
+  static final String USER_AND_PASSWORD = "java_client";
 
   private final TabletClient client;
   private SaslClient saslClient;
-  public static final String SASL_DEFAULT_REALM = "default";
-  public static final Map<String, String> SASL_PROPS = new TreeMap<>();
   private static final int SASL_CALL_ID = -33;
   private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
       ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
@@ -70,19 +75,22 @@ public class SecureRpcHelper {
   private boolean useWrap = false; // no QOP at the moment
   private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
 
-  public static final String USER_AND_PASSWORD = "java_client";
-
-  public SecureRpcHelper(TabletClient client) {
+  public SecureRpcHelper(final TabletClient client) {
     this.client = client;
+
+    Subject subject = client.getSubject();
+    boolean tryKerberos = subject != null &&
+                          !subject.getPrincipals(KerberosPrincipal.class).isEmpty();
+    String[] mechanisms = tryKerberos ? MECHS : INSECURE_MECHS;
+
     try {
-      saslClient = Sasl.createSaslClient(new String[]{"PLAIN"},
-                                         null,
+      saslClient = Sasl.createSaslClient(mechanisms,
                                          null,
-                                         SASL_DEFAULT_REALM,
+                                         "kudu",
+                                         client.getServerInfo().getHostname(),
                                          SASL_PROPS,
-                                         new SaslClientCallbackHandler(USER_AND_PASSWORD,
-                                                                       USER_AND_PASSWORD));
-    } catch (SaslException e) {
+                                         SASL_CALLBACK);
+    } catch (Exception e) {
       throw new RuntimeException("Could not create the SASL client", e);
     }
   }
@@ -129,10 +137,10 @@ public class SecureRpcHelper {
           handleChallengeResponse(chan, response);
           break;
         case SUCCESS:
-          handleSuccessResponse(chan, response);
+          handleSuccessResponse(chan);
           break;
         default:
-          System.out.println("Wrong sasl state");
+          LOG.error(String.format("Wrong SASL state: %s", response.getState()));
       }
       return null;
     }
@@ -229,8 +237,7 @@ public class SecureRpcHelper {
 
   private void handleChallengeResponse(Channel chan, RpcHeader.SaslMessagePB response) throws
       SaslException {
-    ByteString bs = response.getToken();
-    byte[] saslToken = saslClient.evaluateChallenge(bs.toByteArray());
+    byte[] saslToken = saslClient.evaluateChallenge(response.getToken().toByteArray());
     if (saslToken == null) {
       throw new IllegalStateException("Not expecting an empty token");
     }
@@ -240,49 +247,24 @@ public class SecureRpcHelper {
     sendSaslMessage(chan, builder.build());
   }
 
-  private void handleSuccessResponse(Channel chan, RpcHeader.SaslMessagePB response) {
+  private void handleSuccessResponse(Channel chan) {
     LOG.debug("nego finished");
     negoUnderway = false;
     client.sendContext(chan);
   }
 
   private static class SaslClientCallbackHandler implements CallbackHandler {
-    private final String userName;
-    private final char[] userPassword;
-
-    public SaslClientCallbackHandler(String user, String password) {
-      this.userName = user;
-      this.userPassword = password.toCharArray();
-    }
-
-    public void handle(Callback[] callbacks)
-        throws UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
       for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
+        if (callback instanceof NameCallback) {
+          ((NameCallback) callback).setName(USER_AND_PASSWORD);
         } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
+          ((PasswordCallback) callback).setPassword(USER_AND_PASSWORD.toCharArray());
         } else {
           throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL client callback");
+                                                 "Unrecognized SASL client callback");
         }
       }
-      if (nc != null) {
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        pc.setPassword(userPassword);
-      }
-      if (rc != null) {
-        rc.setText(rc.getDefaultText());
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 8f9e75b..c710b4c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -844,8 +845,8 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD);
     builder.setDEPRECATEDUserInfo(userBuilder.build());
     RpcHeader.ConnectionContextPB pb = builder.build();
-    RpcHeader.RequestHeader header = RpcHeader.RequestHeader.newBuilder()
-        .setCallId(CONNECTION_CTX_CALL_ID).build();
+    RpcHeader.RequestHeader header =
+        RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
     return KuduRpc.toChannelBuffer(header, pb);
   }
 
@@ -857,6 +858,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     return serverInfo;
   }
 
+  /**
+   * @return the subject containing security credentials, or null if no subject is available.
+   */
+  Subject getSubject() {
+    return kuduClient.getSubject();
+  }
+
   public String toString() {
     final StringBuilder buf = new StringBuilder(13 + 10 + 6 + 64 + 7 + 32 + 16 + 1 + 17 +
2 + 1);
     buf.append("TabletClient@")           // =13

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
index 6239b25..1e3538c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package org.apache.kudu.client;
 
 import java.io.BufferedReader;
@@ -24,8 +25,6 @@ import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -50,6 +49,9 @@ import org.apache.kudu.annotations.InterfaceAudience;
  * to the KDC.
  *
  * The KDC is managed as an external process, using the krb5 binaries installed on the system.
+ *
+ * For debugging Kerberos client issues, it can be helpful to add
+ * {@code -Dsun.security.krb5.debug=true} to the JVM properties.
  */
 @InterfaceAudience.Private
 @NotThreadSafe
@@ -118,7 +120,7 @@ public class MiniKdc implements Closeable {
     return new MiniKdc(
         new Options("KRBTEST.COM",
                     Paths.get(TestUtils.getBaseDir(), "krb5kdc-" + System.currentTimeMillis()),
-                    TestUtils.findFreePort(PORT_START)));
+                    TestUtils.findFreeUdpPort(PORT_START)));
   }
 
   /**
@@ -135,12 +137,6 @@ public class MiniKdc implements Closeable {
                                                  dataRootDir));
       }
 
-      File credentialCacheDir = options.dataRoot.resolve("krb5cc").toFile();
-      if (!credentialCacheDir.mkdir()) {
-        throw new RuntimeException(String.format("unable to create credential cache directory:
%s",
-                                                 credentialCacheDir));
-      }
-
       createKdcConf();
       createKrb5Conf();
 
@@ -235,7 +231,6 @@ public class MiniKdc implements Closeable {
         "   kdc = STDERR",
 
         "[libdefaults]",
-        "   default_ccache_name = " + "DIR:" + options.dataRoot.resolve("krb5cc"),
         "   default_realm = " + options.realm,
         "   dns_lookup_kdc = false",
         "   dns_lookup_realm = false",
@@ -243,8 +238,16 @@ public class MiniKdc implements Closeable {
         "   renew_lifetime = 7d",
         "   ticket_lifetime = 24h",
 
-        // The KDC is configured to only use TCP, so the client should not prefer UDP.
-        "   udp_preference_limit = 0",
+        // Disable aes256, since Java does not support it without JCE, see
+        // https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html
+        "   default_tkt_enctypes = aes128-cts des3-cbc-sha1",
+        "   default_tgs_enctypes = aes128-cts des3-cbc-sha1",
+        "   permitted_enctypes = aes128-cts des3-cbc-sha1",
+
+        // In miniclusters, we start daemons on local loopback IPs that
+        // have no reverse DNS entries. So, disable reverse DNS.
+        "   rdns = false",
+        "   ignore_acceptor_hostname = true",
 
         "[realms]",
         options.realm + " = {",
@@ -257,8 +260,7 @@ public class MiniKdc implements Closeable {
   private void createKdcConf() throws IOException {
     List<String> contents = ImmutableList.of(
         "[kdcdefaults]",
-        "   kdc_ports = \"\"",
-        "   kdc_tcp_ports = " + options.port,
+        "   kdc_ports = " + options.port,
 
         "[realms]",
         options.realm + " = {",
@@ -312,28 +314,22 @@ public class MiniKdc implements Closeable {
       "/usr/sbin" // Linux
   );
 
-  private Map<String, String> getEnvVars() {
+  public Map<String, String> getEnvVars() {
     return ImmutableMap.of(
         "KRB5_CONFIG", options.dataRoot.resolve("krb5.conf").toString(),
         "KRB5_KDC_PROFILE", options.dataRoot.resolve("kdc.conf").toString(),
-        "KRB5CCNAME", "DIR:" + options.dataRoot.resolve("krb5cc").toString()
+        "KRB5CCNAME", options.dataRoot.resolve("krb5cc").toString()
     );
   }
 
   private Process startProcessWithKrbEnv(String... argv) throws IOException {
-    List<String> args = new ArrayList<>();
-    args.add("env");
-    for (Map.Entry<String, String> entry : getEnvVars().entrySet()) {
-      args.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
-    }
-    args.addAll(Arrays.asList(argv));
-
-    LOG.debug("executing {}: {}", Paths.get(argv[0]).getFileName(), Joiner.on(' ').join(args));
 
-    return new ProcessBuilder(args).redirectOutput(ProcessBuilder.Redirect.PIPE)
-                                   .redirectErrorStream(true)
-                                   .redirectInput(ProcessBuilder.Redirect.PIPE)
-                                   .start();
+    ProcessBuilder procBuilder = new ProcessBuilder(argv);
+    procBuilder.environment().putAll(getEnvVars());
+    LOG.debug("executing '{}', env: '{}'",
+              Joiner.on(" ").join(procBuilder.command()),
+              Joiner.on(", ").withKeyValueSeparator("=").join(procBuilder.environment()));
+    return procBuilder.redirectErrorStream(true).start();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 28b1bf7..2ddc5e7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -11,19 +11,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License. See accompanying LICENSE file.
  */
+
 package org.apache.kudu.client;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.file.Path;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -61,7 +70,7 @@ public class MiniKuduCluster implements AutoCloseable {
   private final Map<Integer, Process> tserverProcesses = new ConcurrentHashMap<>();
 
   // Map of ports to process command lines. Never removed from. Used to restart processes.
-  private final Map<Integer, String[]> commandLines = new ConcurrentHashMap<>();
+  private final Map<Integer, List<String>> commandLines = new ConcurrentHashMap<>();
 
   private final List<String> pathsToDelete = new ArrayList<>();
   private final List<HostAndPort> masterHostPorts = new ArrayList<>();
@@ -73,15 +82,76 @@ public class MiniKuduCluster implements AutoCloseable {
 
   private String masterAddresses;
 
-  private MiniKuduCluster(int numMasters, int numTservers, int defaultTimeoutMs) throws Exception
{
+  private final String bindHost = TestUtils.getUniqueLocalhost();
+  private final Path keytab;
+  private final MiniKdc miniKdc;
+  private final Subject subject;
+
+  private MiniKuduCluster(int numMasters,
+                          int numTservers,
+                          final int defaultTimeoutMs,
+                          boolean enableKerberos) throws Exception {
     this.defaultTimeoutMs = defaultTimeoutMs;
 
+    if (enableKerberos) {
+      miniKdc = MiniKdc.withDefaults();
+      miniKdc.start();
+
+      keytab = miniKdc.createServiceKeytab("kudu/" + bindHost);
+
+      miniKdc.createUserPrincipal("testuser");
+      miniKdc.kinit("testuser");
+      System.setProperty("java.security.krb5.conf", miniKdc.getEnvVars().get("KRB5_CONFIG"));
+
+      Configuration conf = new Configuration() {
+        @Override
+        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+          Map<String, String> options = new HashMap<>();
+          options.put("useKeyTab", "true");
+          options.put("useTicketCache", "true");
+          options.put("ticketCache", miniKdc.getEnvVars().get("KRB5CCNAME"));
+          options.put("principal", "testuser");
+          options.put("doNotPrompt", "true");
+          options.put("renewTGT", "true");
+          options.put("debug", "true");
+
+          return new AppConfigurationEntry[] {
+            new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+                                      AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                      options)
+          };
+        }
+      };
+
+      LoginContext context = new LoginContext("com.sun.security.auth.module.Krb5LoginModule",
+                                              new Subject(), null, conf);
+      context.login();
+      context.getSubject();
+      subject = context.getSubject();
+    } else {
+      miniKdc = null;
+      keytab = null;
+      subject = null;
+    }
+
     startCluster(numMasters, numTservers);
 
-    syncClient = new KuduClient.KuduClientBuilder(getMasterAddresses())
-        .defaultAdminOperationTimeoutMs(defaultTimeoutMs)
-        .defaultOperationTimeoutMs(defaultTimeoutMs)
-        .build();
+    PrivilegedAction<KuduClient> createClient = new PrivilegedAction<KuduClient>()
{
+      @Override
+      public KuduClient run() {
+        KuduClient.KuduClientBuilder kuduClientBuilder =
+            new KuduClient.KuduClientBuilder(getMasterAddresses());
+        kuduClientBuilder.defaultAdminOperationTimeoutMs(defaultTimeoutMs);
+        kuduClientBuilder.defaultOperationTimeoutMs(defaultTimeoutMs);
+        return kuduClientBuilder.build();
+      }
+    };
+
+    if (subject != null) {
+      syncClient = Subject.doAs(subject, createClient);
+    } else {
+      syncClient = createClient.run();
+    }
   }
 
   /**
@@ -104,18 +174,15 @@ public class MiniKuduCluster implements AutoCloseable {
    * Starts a Kudu cluster composed of the provided masters and tablet servers.
    * @param numMasters how many masters to start
    * @param numTservers how many tablet servers to start
-   * @throws Exception
    */
   private void startCluster(int numMasters, int numTservers) throws Exception {
     Preconditions.checkArgument(numMasters > 0, "Need at least one master");
-    Preconditions.checkArgument(numTservers > 0, "Need at least one tablet server");
     // The following props are set via kudu-client's pom.
     String baseDirPath = TestUtils.getBaseDir();
-    String localhost = TestUtils.getUniqueLocalhost();
 
     long now = System.currentTimeMillis();
     LOG.info("Starting {} masters...", numMasters);
-    int startPort = startMasters(PORT_START, numMasters, baseDirPath);
+    int startPort = startMasters(PORT_START, numMasters, baseDirPath, bindHost);
     LOG.info("Starting {} tablet servers...", numTservers);
     List<Integer> ports = TestUtils.findFreePorts(startPort, numTservers * 2);
     for (int i = 0; i < numTservers; i++) {
@@ -123,19 +190,27 @@ public class MiniKuduCluster implements AutoCloseable {
       tserverPorts.add(rpcPort);
       String dataDirPath = baseDirPath + "/ts-" + i + "-" + now;
       String flagsPath = TestUtils.getFlagsPath();
-      String[] tsCmdLine = {
+
+      List<String> commandLine = Lists.newArrayList(
           TestUtils.findBinary("kudu-tserver"),
           "--flagfile=" + flagsPath,
           "--fs_wal_dir=" + dataDirPath,
           "--fs_data_dirs=" + dataDirPath,
           "--flush_threshold_mb=1",
           "--tserver_master_addrs=" + masterAddresses,
-          "--webserver_interface=" + localhost,
-          "--local_ip_for_outbound_sockets=" + localhost,
+          "--webserver_interface=" + bindHost,
+          "--local_ip_for_outbound_sockets=" + bindHost,
           "--webserver_port=" + (rpcPort + 1),
-          "--rpc_bind_addresses=" + localhost + ":" + rpcPort};
-      tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort, tsCmdLine));
-      commandLines.put(rpcPort, tsCmdLine);
+          "--rpc_bind_addresses=" + bindHost + ":" + rpcPort);
+
+      if (miniKdc != null) {
+        commandLine.add("--keytab=" + keytab);
+        commandLine.add("--kerberos_principal=kudu/" + bindHost);
+        commandLine.add("--server_require_kerberos");
+      }
+
+      tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort, commandLine));
+      commandLines.put(rpcPort, commandLine);
 
       if (flagsPath.startsWith(baseDirPath)) {
         // We made a temporary copy of the flags; delete them later.
@@ -155,13 +230,14 @@ public class MiniKuduCluster implements AutoCloseable {
    * @return the next free port
    * @throws Exception if we are unable to start the masters
    */
-  private int startMasters(int masterStartPort, int numMasters,
-                          String baseDirPath) throws Exception {
+  private int startMasters(int masterStartPort,
+                           int numMasters,
+                           String baseDirPath,
+                           String bindHost) throws Exception {
     LOG.info("Starting {} masters...", numMasters);
     // Get the list of web and RPC ports to use for the master consensus configuration:
     // request NUM_MASTERS * 2 free ports as we want to also reserve the web
     // ports for the consensus configuration.
-    String localhost = TestUtils.getUniqueLocalhost();
     List<Integer> ports = TestUtils.findFreePorts(masterStartPort, numMasters * 2);
     int lastFreePort = ports.get(ports.size() - 1);
     List<Integer> masterRpcPorts = Lists.newArrayListWithCapacity(numMasters);
@@ -169,7 +245,7 @@ public class MiniKuduCluster implements AutoCloseable {
     for (int i = 0; i < numMasters * 2; i++) {
       if (i % 2 == 0) {
         masterRpcPorts.add(ports.get(i));
-        masterHostPorts.add(HostAndPort.fromParts(localhost, ports.get(i)));
+        masterHostPorts.add(HostAndPort.fromParts(bindHost, ports.get(i)));
       } else {
         masterWebPorts.add(ports.get(i));
       }
@@ -187,20 +263,26 @@ public class MiniKuduCluster implements AutoCloseable {
       // 3) master 1 happens to bind to port b for the web port, as master 2 hasn't been
       // started yet and findFreePort(s) is "check-time-of-use" (it does not reserve the
       // ports, only checks that when it was last called, these ports could be used).
-      List<String> masterCmdLine = Lists.newArrayList(
+      List<String> commandLine = Lists.newArrayList(
           TestUtils.findBinary("kudu-master"),
           "--flagfile=" + flagsPath,
           "--fs_wal_dir=" + dataDirPath,
           "--fs_data_dirs=" + dataDirPath,
-          "--webserver_interface=" + localhost,
-          "--local_ip_for_outbound_sockets=" + localhost,
-          "--rpc_bind_addresses=" + localhost + ":" + port,
+          "--webserver_interface=" + bindHost,
+          "--local_ip_for_outbound_sockets=" + bindHost,
+          "--rpc_bind_addresses=" + bindHost + ":" + port,
           "--webserver_port=" + masterWebPorts.get(i),
           "--raft_heartbeat_interval_ms=200"); // make leader elections faster for faster
tests
       if (numMasters > 1) {
-        masterCmdLine.add("--master_addresses=" + masterAddresses);
+        commandLine.add("--master_addresses=" + masterAddresses);
+      }
+
+      if (miniKdc != null) {
+        commandLine.add("--keytab=" + keytab);
+        commandLine.add("--kerberos_principal=kudu/" + bindHost);
+        commandLine.add("--server_require_kerberos");
       }
-      String[] commandLine = masterCmdLine.toArray(new String[masterCmdLine.size()]);
+
       masterProcesses.put(port, configureAndStartProcess(port, commandLine));
       commandLines.put(port, commandLine);
 
@@ -224,24 +306,26 @@ public class MiniKuduCluster implements AutoCloseable {
    * or if we were able to start the process but noticed that it was then killed (in which
case
    * we'll log the exit value).
    */
-  private Process configureAndStartProcess(int port, String[] command) throws Exception {
-    LOG.info("Starting process: {}", Joiner.on(" ").join(command));
+  private Process configureAndStartProcess(int port, List<String> command) throws Exception
{
     ProcessBuilder processBuilder = new ProcessBuilder(command);
     processBuilder.redirectErrorStream(true);
+    if (miniKdc != null) {
+      processBuilder.environment().putAll(miniKdc.getEnvVars());
+    }
     Process proc = processBuilder.start();
     ProcessInputStreamLogPrinterRunnable printer =
         new ProcessInputStreamLogPrinterRunnable(proc.getInputStream());
     Thread thread = new Thread(printer);
     thread.setDaemon(true);
-    thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command[0])) +
":" + port);
+    thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command.get(0)))
+ ":" + port);
     PROCESS_INPUT_PRINTERS.add(thread);
     thread.start();
 
     Thread.sleep(300);
     try {
       int ev = proc.exitValue();
-      throw new Exception("We tried starting a process (" + command[0] + ") but it exited
with " +
-          "value=" + ev);
+      throw new Exception(String.format(
+          "We tried starting a process (%s) but it exited with value=%s", command.get(0),
ev));
     } catch (IllegalThreadStateException ex) {
       // This means the process is still alive, it's like reverse psychology.
     }
@@ -279,8 +363,7 @@ public class MiniKuduCluster implements AutoCloseable {
       throw new RuntimeException(message);
     }
 
-    String[] commandLine = commandLines.get(port);
-    map.put(port, configureAndStartProcess(port, commandLine));
+    map.put(port, configureAndStartProcess(port, commandLines.get(port)));
   }
 
   /**
@@ -385,7 +468,15 @@ public class MiniKuduCluster implements AutoCloseable {
           f.delete();
         }
       } catch (Exception e) {
-        LOG.warn("Could not delete path {}", path, e);
+        LOG.warn(String.format("Could not delete path %s", path), e);
+      }
+    }
+
+    if (miniKdc != null) {
+      try {
+        miniKdc.close();
+      } catch (IOException e) {
+        LOG.warn("Unable to close MiniKdc", e);
       }
     }
   }
@@ -430,6 +521,14 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
+   * @return authenticated user credentials for this cluster,
+   *         or {@code null} if it is not a secure cluster.
+   */
+  public Subject getLoggedInSubject() {
+    return subject;
+  }
+
+  /**
    * Helper runnable that receives stdout and logs it along with the process' identifier.
    */
   public static class ProcessInputStreamLogPrinterRunnable implements Runnable {
@@ -463,6 +562,7 @@ public class MiniKuduCluster implements AutoCloseable {
     private int numMasters = 1;
     private int numTservers = 3;
     private int defaultTimeoutMs = 50000;
+    private boolean enableKerberos = false;
 
     public MiniKuduClusterBuilder numMasters(int numMasters) {
       this.numMasters = numMasters;
@@ -485,9 +585,17 @@ public class MiniKuduCluster implements AutoCloseable {
       return this;
     }
 
+    /**
+     * Enables Kerberos on the mini cluster and acquire client credentials for this process.
+     * @return this instance
+     */
+    public MiniKuduClusterBuilder enableKerberos() {
+      enableKerberos = true;
+      return this;
+    }
+
     public MiniKuduCluster build() throws Exception {
-      return new MiniKuduCluster(numMasters, numTservers, defaultTimeoutMs);
+      return new MiniKuduCluster(numMasters, numTservers, defaultTimeoutMs, enableKerberos);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
index 79ce7d9..9feb56f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
@@ -16,7 +16,7 @@
 // under the License.
 package org.apache.kudu.client;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import org.junit.Test;
 
@@ -40,7 +40,7 @@ public class TestMiniKdc {
 
       String klist = kdc.klist();
 
-      assertTrue(klist.contains("alice@KRBTEST.COM"));
+      assertFalse(klist.contains("alice@KRBTEST.COM"));
       assertTrue(klist.contains("bob@KRBTEST.COM"));
       assertTrue(klist.contains("krbtgt/KRBTEST.COM@KRBTEST.COM"));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
index baa5658..fbbe245 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
@@ -13,15 +13,11 @@
  */
 package org.apache.kudu.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.Socket;
 
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestMiniKuduCluster {
@@ -29,62 +25,63 @@ public class TestMiniKuduCluster {
   private static final int NUM_TABLET_SERVERS = 3;
   private static final int DEFAULT_NUM_MASTERS = 1;
 
-  private MiniKuduCluster cluster;
-
-  @Before
-  public void before() throws Exception {
-    cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-        .numMasters(DEFAULT_NUM_MASTERS)
-        .numTservers(NUM_TABLET_SERVERS)
-        .build();
-    assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
-  }
-
-  @After
-  public void after() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
   @Test(timeout = 50000)
   public void test() throws Exception {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasters(DEFAULT_NUM_MASTERS)
+                                                      .numTservers(NUM_TABLET_SERVERS)
+                                                      .build()) {
+      assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
+      assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
+      assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
+
+      {
+        // Kill the master.
+        int masterPort = cluster.getMasterProcesses().keySet().iterator().next();
+        testPort(masterPort, true, 1000);
+        cluster.killMasterOnPort(masterPort);
+
+        testPort(masterPort, false, 2000);
+
+        // Restart the master.
+        cluster.restartDeadMasterOnPort(masterPort);
+
+        // Test we can reach it.
+        testPort(masterPort, true, 3000);
+      }
 
-    assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
-    assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
+      {
+        // Kill the first TS.
+        int tsPort = cluster.getTabletServerProcesses().keySet().iterator().next();
+        testPort(tsPort, true, 1000);
+        cluster.killTabletServerOnPort(tsPort);
 
-    {
-      // Kill the master.
-      int masterPort = cluster.getMasterProcesses().keySet().iterator().next();
-      testPort(masterPort, true, 1000);
-      cluster.killMasterOnPort(masterPort);
+        testPort(tsPort, false, 2000);
 
-      testPort(masterPort, false, 2000);
+        // Restart it.
+        cluster.restartDeadTabletServerOnPort(tsPort);
 
-      // Restart the master.
-      cluster.restartDeadMasterOnPort(masterPort);
+        testPort(tsPort, true, 3000);
+      }
 
-      // Test we can reach it.
-      testPort(masterPort, true, 3000);
+      assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
+      assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
     }
+  }
 
-
-    {
-      // Kill the first TS.
-      int tsPort = cluster.getTabletServerProcesses().keySet().iterator().next();
-      testPort(tsPort, true, 1000);
-      cluster.killTabletServerOnPort(tsPort);
-
-      testPort(tsPort, false, 2000);
-
-      // Restart it.
-      cluster.restartDeadTabletServerOnPort(tsPort);
-
-      testPort(tsPort, true, 3000);
+  @Test(timeout = 50000)
+  public void testKerberos() throws Exception {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasters(DEFAULT_NUM_MASTERS)
+                                                      .numTservers(NUM_TABLET_SERVERS)
+                                                      .enableKerberos()
+                                                      .build()) {
+      try {
+        assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
+      } catch (RuntimeException e) {
+        assertTrue(e.getMessage().contains("incompatible RPC?"));
+      }
     }
-
-    assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
-    assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
   }
 
   /**
@@ -93,9 +90,10 @@ public class TestMiniKuduCluster {
    * @param port the port to test
    * @param testIsOpen true if we should want it to be open, false if we want it closed
    * @param timeout how long we're willing to wait before it happens
-   * @throws InterruptedException
    */
-  private void testPort(int port, boolean testIsOpen, long timeout) throws InterruptedException
{
+  private static void testPort(int port,
+                               boolean testIsOpen,
+                               long timeout) throws InterruptedException {
     DeadlineTracker tracker = new DeadlineTracker();
     while (tracker.getElapsedMillis() < timeout) {
       try {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index 16a5497..ff9801f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -24,9 +24,11 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.file.Files;
@@ -175,6 +177,30 @@ public class TestUtils {
   }
 
   /**
+   * Finds the next free UDP port, starting with the one passed. Keep in mind the
+   * time-of-check-time-of-use nature of this method, the returned port might become occupied
+   * after it was checked for availability.
+   * @param startPort first port to be probed
+   * @return a currently usable port
+   * @throws IOException IOE is thrown if we can't close a socket we tried to open or if
we run
+   * out of ports to try
+   */
+  public static int findFreeUdpPort(int startPort) throws IOException {
+    DatagramSocket ds;
+    for (int i = startPort; i < 65536; i++) {
+      try {
+        SocketAddress address = new InetSocketAddress(getUniqueLocalhost(), i);
+        ds = new DatagramSocket(address);
+      } catch (SocketException e) {
+        continue;
+      }
+      ds.close();
+      return i;
+    }
+    throw new IOException("Ran out of ports");
+  }
+
+  /**
    * Finds a specified number of parts, starting with one passed. Keep in mind the
    * time-of-check-time-of-use nature of this method.
    * @see {@link #findFreePort(int)}

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/resources/flags
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/flags b/java/kudu-client/src/test/resources/flags
index 687676f..fb4c1aa 100644
--- a/java/kudu-client/src/test/resources/flags
+++ b/java/kudu-client/src/test/resources/flags
@@ -1,4 +1,4 @@
 --logtostderr
 --never_fsync
 --unlock_experimental_flags
---unlock_unsafe_flags
\ No newline at end of file
+--unlock_unsafe_flags

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index 48f6325..f1dad8c 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -403,7 +403,7 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon>
{
   DISALLOW_COPY_AND_ASSIGN(ExternalDaemon);
 };
 
-// Resumes a daemon that was stopped with ExteranlDaemon::Pause() upon
+// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
 // exiting a scope.
 class ScopedResumeExternalDaemon {
  public:

http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/src/kudu/rpc/sasl_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.cc b/src/kudu/rpc/sasl_server.cc
index 7170e35..6fa38c8 100644
--- a/src/kudu/rpc/sasl_server.cc
+++ b/src/kudu/rpc/sasl_server.cc
@@ -383,19 +383,13 @@ Status SaslServer::HandleInitiateRequest(const SaslMessagePB& request)
{
   if (s.ok()) {
     nego_ok_ = true;
     RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
-  } else { // s.IsComplete() (equivalent to SASL_CONTINUE)
+  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
     RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
   }
   return Status::OK();
 }
 
 Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen) {
-  if (clen < 1) {
-    Status s = Status::NotAuthorized("SASL library did not provide challenge token!");
-    RETURN_NOT_OK(SendSaslError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-
   SaslMessagePB response;
   response.set_state(SaslMessagePB::CHALLENGE);
   response.mutable_token()->assign(challenge, clen);


Mime
View raw message