kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [1/7] kudu git commit: java: fix ability to connect to a real Kerberized cluster
Date Wed, 08 Feb 2017 22:03:05 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 959a7835d -> b7b418b5e


java: fix ability to connect to a real Kerberized cluster

This fixes a couple issues seen when I tried to run the Java client
from within Impala against a Kerberized Kudu cluster:

* Previously, the ServerInfo class remembered the already-resolved
  address of the server rather than its hostname. This meant that we
  would try to connect to a Kerberos principal "kudu/1.2.3.4" rather
  than "kudu/foo.example.com". This typically would not match the actual
  principal the server was using, resulting in an error that the server
  principal was not found in the database.

* We previously were using 'subject.doAs()' when initializing the SASL
  server, but not when evaluating challenges. It turns out that the
  GSSAPI mechanism only looks for the Kerberos credentials while
  evaluating the challenge. This moves the 'doAs()' to wrap the
  challenge.

* Another issue with the SASL setup was that we were passing all of the
  available client mechanisms into Sasl.createSaslClient() before seeing
  which mechanisms the server was advertised. the SASL library seems to
  always prefer GSSAPI over PLAIN when available. This meant that, if
  the server had Kerberos credentials, it would attempt to use GSSAPI
  and not PLAIN even if connecting to a server which only advertised
  plain. This fixes the negotiation to no longer ignore the server-side
  advertised mechanisms, and instead actually negotiate by picking the
  best mechanism which is both advertised by the server and
  initializable by the client.

* We previously assumed that Kerberos credentials would be available
  from the 'Subject' without any explicit login call. This isn't the
  case: we have to explicitly set up a LoginContext and Configuration to
  log in from the credentials cache. This changes the client constructor
  to login from the ccache if there is no Subject available with
  Kerberos credentials.

With these changes I was able to run an Impala query against a cluster
with -server_require_kerberos.

Change-Id: I6b96fad3cfb40500d7a75e5070ea21bc8e00cbd8
Reviewed-on: http://gerrit.cloudera.org:8080/5922
Reviewed-by: Dan Burkert <danburkert@apache.org>
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/716a1fa1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/716a1fa1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/716a1fa1

Branch: refs/heads/master
Commit: 716a1fa1ea6bdae0268394c84eca27af0a01b6f2
Parents: 959a783
Author: Todd Lipcon <todd@apache.org>
Authored: Mon Feb 6 19:18:24 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Feb 8 21:27:02 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  16 +--
 .../org/apache/kudu/client/ConnectionCache.java |  22 ++--
 .../org/apache/kudu/client/SecureRpcHelper.java | 110 +++++++++++++------
 .../java/org/apache/kudu/client/ServerInfo.java |  28 +++--
 .../java/org/apache/kudu/util/SecurityUtil.java | 103 +++++++++++++++++
 .../java/org/apache/kudu/client/MiniKdc.java    |  12 +-
 .../org/apache/kudu/client/MiniKuduCluster.java |  70 ++----------
 .../apache/kudu/client/TestConnectionCache.java |   5 +-
 .../org/apache/kudu/client/TestMiniKdc.java     |  22 ++++
 .../apache/kudu/client/TestMiniKuduCluster.java |   6 +-
 .../apache/kudu/client/TestRemoteTablet.java    |  18 ++-
 11 files changed, 284 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 91f03ff..c3fefab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,13 +27,9 @@
 package org.apache.kudu.client;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.security.AccessControlContext;
-import java.security.AccessController;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+
 import javax.annotation.concurrent.GuardedBy;
 import javax.security.auth.Subject;
 
@@ -57,6 +54,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -75,6 +73,7 @@ import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
 import org.apache.kudu.util.AsyncUtil;
 import org.apache.kudu.util.NetUtil;
 import org.apache.kudu.util.Pair;
+import org.apache.kudu.util.SecurityUtil;
 
 /**
  * A fully asynchronous and thread-safe client for Kudu.
@@ -1516,17 +1515,13 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return A live and initialized client for the specified master server.
    */
   TabletClient newMasterClient(HostAndPort masterHostPort) {
-    InetAddress inetAddress = NetUtil.getInetAddress((masterHostPort.getHostText()));
-    if (inetAddress == null) {
-      return null;
-    }
     // We should pass a UUID here but we have a chicken and egg problem, we first need to
     // communicate with the masters to find out about them, and that's what we're trying
to do.
     // The UUID is used for logging, so instead we're passing the "master table name" followed
by
     // host and port which is enough to identify the node we're connecting to.
     return connectionCache.newClient(
         MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
-        inetAddress, masterHostPort.getPort());
+        masterHostPort);
   }
 
   /**
@@ -1830,8 +1825,7 @@ public class AsyncKuduClient implements AutoCloseable {
      * @return a new asynchronous Kudu client
      */
     public AsyncKuduClient build() {
-      AccessControlContext context = AccessController.getContext();
-      subject = Subject.getSubject(context);
+      subject = SecurityUtil.getSubjectOrLogin();
       return new AsyncKuduClient(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index c19dcae..1115aed 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -31,6 +31,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
@@ -108,18 +109,23 @@ class ConnectionCache {
     // from meta_cache.cc
     // TODO: if the TS advertises multiple host/ports, pick the right one
     // based on some kind of policy. For now just use the first always.
-    InetAddress inetAddress = NetUtil.getInetAddress(addresses.get(0).getHost());
+    HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
+    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHostText());
     if (inetAddress == null) {
       throw new UnknownHostException(
           "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
     }
-    return newClient(uuid, inetAddress, addresses.get(0).getPort()).getServerInfo();
+    return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo();
   }
 
-  TabletClient newClient(String uuid, InetAddress inetAddress, int port) {
-    String host = inetAddress.getHostAddress();
-    boolean isLocal = NetUtil.isLocalAddress(inetAddress);
-    ServerInfo serverInfo = new ServerInfo(uuid, host, port, isLocal);
+  TabletClient newClient(String uuid, HostAndPort hostPort) {
+    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHostText());
+    if (inetAddress == null) {
+      // TODO(todd): should we log the resolution failure? throw an exception?
+      return null;
+    }
+
+    ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress);
     return newClient(serverInfo);
   }
 
@@ -148,8 +154,8 @@ class ConnectionCache {
     // Java since the JRE doesn't expose any way to call setsockopt() with
     // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
     config.setKeepAlive(true);
-    chan.connect(
-        new InetSocketAddress(serverInfo.getHostname(), serverInfo.getPort()));  // Won't
block.
+    chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(),
+                                       serverInfo.getPort())); // Won't block.
     return client;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
index 23bc35d..3ada9f1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
@@ -26,23 +26,29 @@
 
 package org.apache.kudu.client;
 
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.Set;
+
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ZeroCopyLiteralByteString;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -60,8 +66,13 @@ public class SecureRpcHelper {
 
   private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
   private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
-  private static final String[] MECHS = new String[] { "GSSAPI", "PLAIN" };
-  private static final String[] INSECURE_MECHS = new String[] { "PLAIN" };
+
+  /**
+   * List of SASL mechanisms supported by the client, in descending priority order.
+   * The client will pick the first of these mechanisms that is supported by
+   * the server and also succeeds to initialize.
+   */
+  private static final String[] PRIORITIZED_MECHS = new String[] { "GSSAPI", "PLAIN" };
 
   static final String USER_AND_PASSWORD = "java_client";
 
@@ -76,22 +87,6 @@ public class SecureRpcHelper {
 
   public SecureRpcHelper(final TabletClient client) {
     this.client = client;
-
-    Subject subject = client.getSubject();
-    boolean tryKerberos = subject != null &&
-                          !subject.getPrincipals(KerberosPrincipal.class).isEmpty();
-    String[] mechanisms = tryKerberos ? MECHS : INSECURE_MECHS;
-
-    try {
-      saslClient = Sasl.createSaslClient(mechanisms,
-                                         null,
-                                         "kudu",
-                                         client.getServerInfo().getHostname(),
-                                         SASL_PROPS,
-                                         SASL_CALLBACK);
-    } catch (Exception e) {
-      throw new RuntimeException("Could not create the SASL client", e);
-    }
   }
 
   public Set<RpcHeader.RpcFeatureFlag> getServerFeatures() {
@@ -126,7 +121,7 @@ public class SecureRpcHelper {
   }
 
   public ChannelBuffer handleResponse(ChannelBuffer buf, Channel chan) throws SaslException
{
-    if (!saslClient.isComplete() || negoUnderway) {
+    if (negoUnderway) {
       RpcHeader.NegotiatePB response = parseSaslMsgResponse(buf);
       switch (response.getStep()) {
         case NEGOTIATE:
@@ -201,13 +196,10 @@ public class SecureRpcHelper {
     return saslBuilder.build();
   }
 
+  private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
+      SaslException {
 
-  private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
SaslException {
-    RpcHeader.NegotiatePB.SaslMechanism negotiatedAuth = null;
-    for (RpcHeader.NegotiatePB.SaslMechanism auth : response.getSaslMechanismsList()) {
-      negotiatedAuth = auth;
-    }
-
+    // Store the supported features advertised by the server.
     ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
     for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
       if (SUPPORTED_RPC_FEATURES.contains(feature)) {
@@ -216,23 +208,60 @@ public class SecureRpcHelper {
     }
     serverFeatures = features.build();
 
-    byte[] saslToken = new byte[0];
-    if (saslClient.hasInitialResponse()) {
-      saslToken = saslClient.evaluateChallenge(saslToken);
+    // Gather the set of server-supported mechanisms.
+    Set<String> serverMechs = Sets.newHashSet();
+    for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
+      serverMechs.add(mech.getMechanism());
+    }
+
+    // For each of our own mechanisms, in descending priority, check if
+    // the server also supports them. If so, try to initialize saslClient.
+    // If we find a common mechanism that also can be successfully initialized,
+    // choose that mech.
+    byte[] initialResponse = null;
+    String chosenMech = null;
+    Map<String, String> errorsByMech = Maps.newHashMap();
+    for (String clientMech : PRIORITIZED_MECHS) {
+      if (!serverMechs.contains(clientMech)) {
+        errorsByMech.put(clientMech, "not advertised by server");
+        continue;
+      }
+      try {
+        saslClient = Sasl.createSaslClient(new String[]{ clientMech },
+                                           null,
+                                           "kudu",
+                                           client.getServerInfo().getHostname(),
+                                           SASL_PROPS,
+                                           SASL_CALLBACK);
+        if (saslClient.hasInitialResponse()) {
+          initialResponse = evaluateChallenge(new byte[0]);
+        }
+        chosenMech = clientMech;
+        break;
+      } catch (SaslException e) {
+        errorsByMech.put(clientMech, e.getMessage());
+        saslClient = null;
+      }
+    }
+
+    if (chosenMech == null) {
+      throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
+                              Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech)
+
+                              "]");
     }
 
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
-    if (saslToken != null) {
-      builder.setToken(ZeroCopyLiteralByteString.wrap(saslToken));
+    if (initialResponse != null) {
+      builder.setToken(ZeroCopyLiteralByteString.wrap(initialResponse));
     }
     builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
-    builder.addSaslMechanisms(negotiatedAuth);
+    builder.addSaslMechanismsBuilder().setMechanism(chosenMech);
     sendSaslMessage(chan, builder.build());
   }
 
   private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response) throws
       SaslException {
-    byte[] saslToken = saslClient.evaluateChallenge(response.getToken().toByteArray());
+    byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
     if (saslToken == null) {
       throw new IllegalStateException("Not expecting an empty token");
     }
@@ -248,6 +277,21 @@ public class SecureRpcHelper {
     client.sendContext(chan);
   }
 
+  private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
+    final Subject subject = client.getSubject();
+    try {
+      return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+          @Override
+          public byte[] run() throws Exception {
+            return saslClient.evaluateChallenge(challenge);
+          }
+        });
+    } catch (Exception e) {
+      Throwables.propagateIfInstanceOf(e, SaslException.class);
+      throw Throwables.propagate(e);
+    }
+  }
+
   private static class SaslClientCallbackHandler implements CallbackHandler {
     public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
       for (Callback callback : callbacks) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
index 39ac597..ce72f0e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
@@ -17,7 +17,12 @@
 
 package org.apache.kudu.client;
 
+import java.net.InetAddress;
+
+import com.google.common.net.HostAndPort;
+
 import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.util.NetUtil;
 
 /**
  * Container class for server information that never changes, like UUID and hostname.
@@ -25,8 +30,8 @@ import org.apache.kudu.annotations.InterfaceAudience;
 @InterfaceAudience.Private
 public class ServerInfo {
   private final String uuid;
-  private final String hostname;
-  private final int port;
+  private final HostAndPort hostPort;
+  private final InetAddress resolvedAddr;
   private final boolean local;
 
   /**
@@ -37,11 +42,11 @@ public class ServerInfo {
    * @param port server's port
    * @param local if the server is hosted on the same machine where this client is running
    */
-  public ServerInfo(String uuid, String hostname, int port, boolean local) {
+  public ServerInfo(String uuid, HostAndPort hostPort, InetAddress resolvedAddr) {
     this.uuid = uuid;
-    this.hostname = hostname;
-    this.port = port;
-    this.local = local;
+    this.hostPort = hostPort;
+    this.resolvedAddr = resolvedAddr;
+    this.local = NetUtil.isLocalAddress(resolvedAddr);
   }
 
   /**
@@ -58,7 +63,7 @@ public class ServerInfo {
    * @return a string that contains this server's hostname
    */
   public String getHostname() {
-    return hostname;
+    return hostPort.getHostText();
   }
 
   /**
@@ -66,7 +71,7 @@ public class ServerInfo {
    * @return a port number that this server is bound to
    */
   public int getPort() {
-    return port;
+    return hostPort.getPort();
   }
 
   /**
@@ -76,4 +81,11 @@ public class ServerInfo {
   public boolean isLocal() {
     return local;
   }
+
+  /**
+   * @return the cached resolved address for this server
+   */
+  public InetAddress getResolvedAddress() {
+    return resolvedAddr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
new file mode 100644
index 0000000..68d9b00
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+
+@InterfaceAudience.Private
+public abstract class SecurityUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(SecurityUtil.class);
+  public static final String KUDU_TICKETCACHE_PROPERTY = "kudu.krb5ccname";
+
+  /**
+   * Return the Subject associated with the current thread's AccessController,
+   * if that subject has Kerberos credentials. If there is no such subject, or
+   * the subject has no Kerberos credentials, logins in a new subject from the
+   * currently configured TicketCache.
+   */
+  public static Subject getSubjectOrLogin() {
+    AccessControlContext context = AccessController.getContext();
+    Subject subject = Subject.getSubject(context);
+    if (subject != null &&
+        !subject.getPrincipals(KerberosPrincipal.class).isEmpty()) {
+      LOG.debug("Using existing subject with Kerberos credentials: {}",
+          subject.toString());
+      return subject;
+    }
+    // If there isn't any current subject with krb5 principals, try to login
+    // using the ticket cache.
+    Configuration conf = new Configuration() {
+      @Override
+      public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        Map<String, String> options = new HashMap<>();
+
+        // TODO: should we offer some kind of "renewal thread" or
+        // "reacquire from keytab thread" like Hadoop does?
+        options.put("useTicketCache", "true");
+        options.put("doNotPrompt", "true");
+        options.put("refreshKrb5Config", "true");
+
+        // Allow configuring debug by a system property.
+        options.put("debug", Boolean.toString(Boolean.getBoolean("kudu.jaas.debug")));
+
+        // Look for the ticket cache specified in one of the following ways:
+        // 1) in a Kudu-specific system property (this is convenient for testing)
+        // 2) in the KRB5CCNAME environment variable
+        // 3) the Java default (by not setting any value)
+        String ticketCache = System.getProperty(KUDU_TICKETCACHE_PROPERTY,
+            System.getenv("KRB5CCNAME"));
+        if (ticketCache != null) {
+          LOG.debug("Using ticketCache: {}", ticketCache);
+          options.put("ticketCache", ticketCache);
+        }
+        options.put("renewTGT", "true");
+
+        return new AppConfigurationEntry[] { new AppConfigurationEntry(
+            "com.sun.security.auth.module.Krb5LoginModule",
+            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
+      }
+    };
+    try {
+      LoginContext loginContext = new LoginContext("kudu", new Subject(), null, conf);
+      loginContext.login();
+      subject = loginContext.getSubject();
+      LOG.debug("Logged in as subject: {}", Joiner.on(",").join(subject.getPrincipals()));
+      return subject;
+    } catch (LoginException e) {
+      LOG.debug("Could not login via JAAS. Using no credentials", e);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
index 9d6916a..b36e344 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.concurrent.NotThreadSafe;
 
 import com.google.common.base.Charsets;
@@ -36,10 +37,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.CharStreams;
+
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.kudu.annotations.InterfaceAudience;
 
 /**
@@ -318,11 +319,18 @@ public class MiniKdc implements Closeable {
     return ImmutableMap.of(
         "KRB5_CONFIG", options.dataRoot.resolve("krb5.conf").toString(),
         "KRB5_KDC_PROFILE", options.dataRoot.resolve("kdc.conf").toString(),
-        "KRB5CCNAME", options.dataRoot.resolve("krb5cc").toString(),
+        "KRB5CCNAME", getTicketCachePath(),
         "KUDU_ENABLE_KRB5_REALM_FIX", "yes"
     );
   }
 
+  /**
+   * @return the path of the Kerberos ticket/credential cache
+   */
+  public String getTicketCachePath() {
+    return options.dataRoot.resolve("krb5cc").toString();
+  }
+
   private Process startProcessWithKrbEnv(String... argv) throws IOException {
 
     ProcessBuilder procBuilder = new ProcessBuilder(argv);

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 3a05ccc..3b9bb49 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -20,19 +20,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.file.Path;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import javax.security.auth.Subject;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.LoginContext;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -42,11 +36,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
+
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.kudu.util.NetUtil;
+import org.apache.kudu.util.SecurityUtil;
 
 /**
  * Utility class to start and manipulate Kudu clusters. Relies on being IN the Kudu source
code with
@@ -87,7 +82,6 @@ public class MiniKuduCluster implements AutoCloseable {
   private final String bindHost = TestUtils.getUniqueLocalhost();
   private Path keytab;
   private MiniKdc miniKdc;
-  private Subject subject;
 
   private MiniKuduCluster(final int defaultTimeoutMs,
                           final List<String> extraTserverFlags,
@@ -109,33 +103,10 @@ public class MiniKuduCluster implements AutoCloseable {
 
     miniKdc.createUserPrincipal("testuser");
     miniKdc.kinit("testuser");
-    System.setProperty("java.security.krb5.conf", miniKdc.getEnvVars().get("KRB5_CONFIG"));
-
-    Configuration conf = new Configuration() {
-      @Override
-      public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
-        Map<String, String> options = new HashMap<>();
-        options.put("useKeyTab", "true");
-        options.put("useTicketCache", "true");
-        options.put("ticketCache", miniKdc.getEnvVars().get("KRB5CCNAME"));
-        options.put("principal", "testuser");
-        options.put("doNotPrompt", "true");
-        options.put("renewTGT", "true");
-        options.put("debug", "true");
-
-        return new AppConfigurationEntry[] {
-          new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
-                                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                                    options)
-        };
-      }
-    };
-
-    LoginContext context = new LoginContext("com.sun.security.auth.module.Krb5LoginModule",
-                                            new Subject(), null, conf);
-    context.login();
-    context.getSubject();
-    subject = context.getSubject();
+    System.setProperty("java.security.krb5.conf",
+        miniKdc.getEnvVars().get("KRB5_CONFIG"));
+    System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY,
+        miniKdc.getEnvVars().get("KRB5CCNAME"));
   }
 
   /**
@@ -146,22 +117,11 @@ public class MiniKuduCluster implements AutoCloseable {
   private void start(int numMasters, int numTservers) throws Exception {
     startCluster(numMasters, numTservers);
 
-    PrivilegedAction<KuduClient> createClient = new PrivilegedAction<KuduClient>()
{
-      @Override
-      public KuduClient run() {
-        KuduClient.KuduClientBuilder kuduClientBuilder =
-            new KuduClient.KuduClientBuilder(getMasterAddresses());
-        kuduClientBuilder.defaultAdminOperationTimeoutMs(defaultTimeoutMs);
-        kuduClientBuilder.defaultOperationTimeoutMs(defaultTimeoutMs);
-        return kuduClientBuilder.build();
-      }
-    };
-
-    if (subject != null) {
-      syncClient = Subject.doAs(subject, createClient);
-    } else {
-      syncClient = createClient.run();
-    }
+    KuduClient.KuduClientBuilder kuduClientBuilder =
+        new KuduClient.KuduClientBuilder(getMasterAddresses());
+    kuduClientBuilder.defaultAdminOperationTimeoutMs(defaultTimeoutMs);
+    kuduClientBuilder.defaultOperationTimeoutMs(defaultTimeoutMs);
+    syncClient = kuduClientBuilder.build();
   }
 
   /**
@@ -533,14 +493,6 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
-   * @return authenticated user credentials for this cluster,
-   *         or {@code null} if it is not a secure cluster.
-   */
-  public Subject getLoggedInSubject() {
-    return subject;
-  }
-
-  /**
    * Helper runnable that receives stdout and logs it along with the process' identifier.
    */
   public static class ProcessInputStreamLogPrinterRunnable implements Runnable {

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 57eceef..5e67381 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
 import java.util.List;
 
+import org.apache.kudu.util.NetUtil;
 import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
 import org.junit.Test;
@@ -47,8 +49,9 @@ public class TestConnectionCache {
       int i = 0;
       for (HostAndPort hp : addresses) {
         // Ping the process so we go through the whole connection process.
+        InetAddress addr = NetUtil.getInetAddress(hp.getHostText());
         TabletClient conn =
-            cache.newClient(new ServerInfo(i + "", hp.getHostText(), hp.getPort(), false));
+            cache.newClient(new ServerInfo(i + "", hp, addr));
         pingConnection(conn);
         i++;
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
index 9feb56f..51d40be 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
@@ -18,7 +18,11 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.*;
 
+import javax.security.auth.Subject;
+
+import org.apache.kudu.util.SecurityUtil;
 import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
 
 public class TestMiniKdc {
 
@@ -46,6 +50,24 @@ public class TestMiniKdc {
     }
   }
 
+  /**
+   * Test that we can initialize a JAAS Subject from a user-provided TicketCache.
+   */
+  @Test
+  public void testGetKerberosSubject() throws Exception {
+    try (MiniKdc kdc = MiniKdc.withDefaults()) {
+      kdc.start();
+      kdc.createUserPrincipal("alice");
+      kdc.kinit("alice");
+      // Typically this would be picked up from the $KRB5CCNAME environment
+      // variable, or use a default. However, it's not easy to modify the
+      // environment in Java, so instead we override a system property.
+      System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, kdc.getTicketCachePath());
+      Subject subj = SecurityUtil.getSubjectOrLogin();
+      assertThat(subj.toString(), containsString("alice"));
+    }
+  }
+
   @Test
   public void testStopClose() throws Exception {
     // Test that closing a stopped KDC does not throw.

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
index fbbe245..34b6090 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
@@ -76,11 +76,7 @@ public class TestMiniKuduCluster {
                                                       .numTservers(NUM_TABLET_SERVERS)
                                                       .enableKerberos()
                                                       .build()) {
-      try {
-        assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
-      } catch (RuntimeException e) {
-        assertTrue(e.getMessage().contains("incompatible RPC?"));
-      }
+      assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/716a1fa1/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index 22cfec0..bed7e4b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -21,9 +21,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.net.HostAndPort;
 import com.google.protobuf.ByteString;
 import org.junit.Test;
 
@@ -124,8 +127,21 @@ public class TestRemoteTablet {
     tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet"));
     List<ServerInfo> servers = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
+      InetAddress addr;
+      try {
+        if (i == localReplicaIndex) {
+          addr = InetAddress.getByName("127.0.0.1");
+        } else {
+          addr = InetAddress.getByName("1.2.3.4");
+        }
+      } catch (UnknownHostException e) {
+        throw new RuntimeException(e);
+      }
+
       String uuid = i + "";
-      servers.add(new ServerInfo(uuid, "host", i, i == localReplicaIndex));
+      servers.add(new ServerInfo(uuid,
+                                 HostAndPort.fromParts("host", i),
+                                 addr));
       tabletPb.addReplicas(TestUtils.getFakeTabletReplicaPB(
           uuid, "host", i,
           leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER));


Mime
View raw message