kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 02/03: KUDU-2543 pt 3 java: pass around authz tokens
Date Mon, 11 Mar 2019 21:09:14 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7645f5b2ea1f54777ce88c61df54422b068142e5
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Sun Jan 6 21:02:46 2019 -0800

    KUDU-2543 pt 3 java: pass around authz tokens
    
    Adds handling of authz tokens to the Java client. The Java client will
    now cache tokens upon opening a table, and use them for RPCs that need
    them (e.g. Writes and Scans), reacquiring them when receiving word that
    they are expired.
    
    This is tested as follows:
    - TestAuthnTokenReacquire's test for scans and writes is repurposed to
      also test for reacquisition of authz tokens when they expire
    - basic tests are added to test the token cache
    - a test is added to test authz reacquisition in the case that a
      multi-master deployment undergoes a leadership change
    - a test is added to test authz reacquisition upon invalid or expired
      tokens during prolonged workloads against a multi-master deployment
    
    Change-Id: Iadd5f7709b45628d7ddd9e2b100d0dfaabbf15af
    Reviewed-on: http://gerrit.cloudera.org:8080/12279
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Hao Hao <hao.hao@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  53 ++++-
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  17 ++
 .../org/apache/kudu/client/AuthzTokenCache.java    | 243 +++++++++++++++++++++
 .../main/java/org/apache/kudu/client/Batch.java    |  17 ++
 .../java/org/apache/kudu/client/Connection.java    |  10 +-
 .../apache/kudu/client/GetTableSchemaRequest.java  |  21 +-
 .../apache/kudu/client/GetTableSchemaResponse.java |  15 +-
 .../kudu/client/InvalidAuthzTokenException.java    |  40 ++++
 .../main/java/org/apache/kudu/client/KuduRpc.java  |  13 ++
 .../java/org/apache/kudu/client/Operation.java     |  16 ++
 .../main/java/org/apache/kudu/client/RpcProxy.java |  13 +-
 ...nReacquire.java => TestAuthTokenReacquire.java} | 110 +++++++---
 .../apache/kudu/client/TestAuthzTokenCache.java    | 151 +++++++++++++
 .../kudu/client/TestMultiMasterAuthzTokens.java    | 184 ++++++++++++++++
 .../java/org/apache/kudu/test/ClientTestUtil.java  |  12 +
 15 files changed, 872 insertions(+), 43 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cf22d0a..3bdb56b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -56,6 +56,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -68,6 +69,7 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.security.Token.SignedTokenPB;
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
@@ -116,8 +118,8 @@ import org.apache.kudu.util.Pair;
  * Authentication and Authorization Service</em> (JAAS) API provided by the JDK.
  * JAAS provides a common way for applications to initialize Kerberos
  * credentials, store these credentials in a {@link javax.security.auth.Subject}
- * instance, and associate the Subject the current thread of execution. The Kudu
- * client then accesses the Kerberos credentials in the
+ * instance, and associate the Subject with the current thread of execution.
+ * The Kudu client then accesses the Kerberos credentials in the
  * {@link javax.security.auth.Subject} and uses them to authenticate to the
  * remote cluster as necessary.
  * <p>
@@ -355,6 +357,9 @@ public class AsyncKuduClient implements AutoCloseable {
   /** A helper to facilitate re-acquiring of authentication token if current one expires. */
   private final AuthnTokenReacquirer tokenReacquirer;
 
+  /** A helper to facilitate retrieving authz tokens */
+  private final AuthzTokenCache authzTokenCache;
+
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
@@ -373,6 +378,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.connectionCache = new ConnectionCache(
         securityContext, timer, channelFactory);
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
+    this.authzTokenCache = new AuthzTokenCache(this);
   }
 
   /**
@@ -756,11 +762,14 @@ public class AsyncKuduClient implements AutoCloseable {
     Preconditions.checkNotNull(tableName);
 
     // Prefer a lookup by table ID over name, since the former is immutable.
+    // For backwards compatibility with older tservers, we don't require authz
+    // token support.
     GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable,
                                                           tableId,
                                                           tableId != null ? null : tableName,
                                                           timer,
-                                                          defaultAdminOperationTimeoutMs);
+                                                          defaultAdminOperationTimeoutMs,
+                                                          /*requiresAuthzTokenSupport=*/false);
 
     rpc.setParentRpc(parent);
     return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
@@ -773,6 +782,10 @@ public class AsyncKuduClient implements AutoCloseable {
         if (cache != null) {
           cache.clearNonCoveredRangeEntries();
         }
+        SignedTokenPB authzToken = resp.getAuthzToken();
+        if (authzToken != null) {
+          authzTokenCache.put(resp.getTableId(), authzToken);
+        }
 
         LOG.debug("Opened table {}", resp.getTableId());
         return new KuduTable(AsyncKuduClient.this,
@@ -898,6 +911,11 @@ public class AsyncKuduClient implements AutoCloseable {
               }));
   }
 
+  @InterfaceAudience.LimitedPrivate("Test")
+  public AuthzTokenCache getAuthzTokenCache() {
+    return this.authzTokenCache;
+  }
+
   /**
    * Get the Hive Metastore configuration of the most recently connected-to leader master, or
    * {@code null} if the Hive Metastore integration is not enabled.
@@ -1955,17 +1973,40 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Handle a RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
+   * Handle an RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
    * to acquire a new authentication token and retry the RPC once a new authentication token
    * is put into the {@link #securityContext}.
    *
-   * @param rpc the RPC which failed do to invalid authn token
+   * @param rpc the RPC which failed with an invalid authn token
    */
-  <R> void handleInvalidToken(KuduRpc<R> rpc) {
+  <R> void handleInvalidAuthnToken(KuduRpc<R> rpc) {
+    // TODO(awong): plumb the offending KuduException into the reacquirer.
     tokenReacquirer.handleAuthnTokenExpiration(rpc);
   }
 
   /**
+   * Handle an RPC that failed due to an invalid authorization token error. The
+   * RPC will be retried after fetching a new authz token.
+   *
+   * @param rpc the RPC that failed with an invalid authz token
+   * @param ex the KuduException that led to this handling
+   */
+  <R> void handleInvalidAuthzToken(KuduRpc<R> rpc, KuduException ex) {
+    authzTokenCache.retrieveAuthzToken(rpc, ex);
+  }
+
+  /**
+   * Gets an authorization token for the given table from the cache, or nullptr
+   * if none exists.
+   *
+   * @param tableId the table ID for which to get an authz token
+   * @return a signed authz token for the table
+   */
+  SignedTokenPB getAuthzToken(String tableId) {
+    return authzTokenCache.get(tableId);
+  }
+
+  /**
    * This methods enable putting RPCs on hold for a period of time determined by
    * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of time/retries, its errback will
    * be immediately called.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 93bafb8..68a0982 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -43,6 +43,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.security.Token;
 import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
 import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -886,6 +887,9 @@ public final class AsyncKuduScanner {
 
     State state;
 
+    /** The token with which to authorize this RPC. */
+    private Token.SignedTokenPB authzToken;
+
     ScanRequest(KuduTable table, State state, RemoteTablet tablet) {
       super(table, client.getTimer(), scanRequestTimeout);
       setTablet(tablet);
@@ -916,6 +920,16 @@ public final class AsyncKuduScanner {
       return replicaSelection;
     }
 
+    @Override
+    boolean needsAuthzToken() {
+      return true;
+    }
+
+    @Override
+    void bindAuthzToken(Token.SignedTokenPB token) {
+      authzToken = token;
+    }
+
     /** Serializes this request.  */
     @Override
     Message createRequestPB() {
@@ -968,6 +982,9 @@ public final class AsyncKuduScanner {
           for (KuduPredicate pred : predicates.values()) {
             newBuilder.addColumnPredicates(pred.toPB());
           }
+          if (authzToken != null) {
+            newBuilder.setAuthzToken(authzToken);
+          }
           builder.setNewScanRequest(newBuilder.build())
                  .setBatchSizeBytes(batchSizeBytes);
           break;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java
new file mode 100644
index 0000000..1dd17e1
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import com.stumbleupon.async.Callback;
+import org.apache.kudu.security.Token;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Cache for authz tokens received from the master of unbounded capacity. A
+ * client will receive an authz token upon opening a table and put it into the
+ * cache. A subsequent operation that requires an authz token (e.g. writes,
+ * scans) will fetch it from the cache and attach it to the operation request.
+ */
+@ThreadSafe
+@InterfaceAudience.Private
+public class AuthzTokenCache {
+  private static class RpcAndException {
+    final KuduRpc<?> rpc;
+    final KuduException ex;
+
+    RpcAndException(KuduRpc<?> rpc, KuduException ex) {
+      this.rpc = rpc;
+      this.ex = ex;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuthzTokenCache.class);
+  private final AsyncKuduClient client;
+
+  // Map from a table ID to an authz token for that table.
+  private final ConcurrentHashMap<String, Token.SignedTokenPB> authzTokens = new ConcurrentHashMap<>();
+
+  // Map from a table ID that has an in-flight RPC to get a new authz token, to
+  // the list of RPCs waiting to be retried once that token is received and the
+  // exception each is handling.
+  // Note: Unlike the token map which is synchronized to make it threadsafe,
+  // synchronization of this map also serves to ensure requests for the same
+  // table ID get grouped together.
+  @GuardedBy("retriesLock")
+  private final Map<String, List<RpcAndException>>
+      retriesForTable = new HashMap<>();
+  private final Object retriesLock = new Object();
+
+  // Number of RPCs sent to retrieve authz tokens. Useful for testing.
+  private AtomicInteger numRetrievalsSent;
+
+  /**
+   * Create a new AuthzTokenCache object.
+   *
+   * @param client the Kudu client object with which to send requests.
+   */
+  AuthzTokenCache(@Nonnull AsyncKuduClient client) {
+    this.client = client;
+    numRetrievalsSent = new AtomicInteger(0);
+  }
+
+  /**
+   * Returns the number of RPCs sent to retrieve authz token over the lifetime
+   * of this cache.
+   * @return number of RPCs sent
+   */
+  @InterfaceAudience.LimitedPrivate("Test")
+  int numRetrievalsSent() {
+    return numRetrievalsSent.get();
+  }
+
+  /**
+   * Puts the given token into the cache. No validation is done on the validity
+   * or expiration of the token -- that happens on the tablet servers.
+   *
+   * @param tableId the table ID the authz token is for
+   * @param token an authz token to put into the cache
+   */
+  void put(@Nonnull String tableId, @Nonnull Token.SignedTokenPB token) {
+    authzTokens.put(tableId, token);
+  }
+
+  /**
+   * Returns the cached token for the given 'tableId' if one exists.
+   *
+   * @param tableId table ID to get an authz token for
+   * @return the token for the table ID if one exists
+   */
+  Token.SignedTokenPB get(@Nonnull String tableId) {
+    return authzTokens.get(tableId);
+  }
+
+  /**
+   * Returns the list of pending RPCs waiting on a new authz token for the given
+   * table, clearing the table's entry in the pending map.
+   *
+   * @param tableId the table ID whose RPCs should be cleared
+   * @return the RPCs to be retried for the given table ID and the
+   */
+  private List<RpcAndException> clearPendingRetries(@Nonnull String tableId) {
+    List<RpcAndException> pendingRetries;
+    synchronized (retriesLock) {
+      pendingRetries = retriesForTable.remove(tableId);
+    }
+    Preconditions.checkState(!pendingRetries.isEmpty(),
+        "no pending retries for table " + tableId);
+    return pendingRetries;
+  }
+
+  /**
+   * Sends an RPC to retrieve an authz token for retrying the specified parent
+   * RPC, calling 'cb' on success and 'eb' on failure.
+   *
+   * 'parentRpc' is used for logging and deadline tracking.
+   *
+   * @param parentRpc the RPC that is waiting on the authz token
+   * @param cb callback to be called after receiving a response from the master
+   * @param eb errback to be called after hitting an exception
+   */
+  private void sendRetrievalForRpc(@Nonnull KuduRpc<?> parentRpc,
+                                   @Nonnull Callback<Void, GetTableSchemaResponse> cb,
+                                   @Nonnull Callback<Void, Exception> eb) {
+    String tableId = parentRpc.getTable().getTableId();
+    LOG.debug("sending RPC to retrieve token for table ID " + tableId);
+    GetTableSchemaRequest retrieveAuthzTokenReq = new GetTableSchemaRequest(
+        client.getMasterTable(), tableId, /*name=*/null, client.getTimer(),
+        client.getDefaultAdminOperationTimeoutMs(), /*requiresAuthzTokenSupport=*/true);
+    retrieveAuthzTokenReq.setParentRpc(parentRpc);
+    retrieveAuthzTokenReq.deadlineTracker.setDeadline(parentRpc.deadlineTracker.getDeadline());
+    numRetrievalsSent.incrementAndGet();
+    client.sendRpcToTablet(retrieveAuthzTokenReq).addCallback(cb)
+                                                 .addErrback(eb);
+  }
+
+  /**
+   * Method to call upon receiving an RPC that indicates it had an invalid authz
+   * token and needs a new one. If there is already an in-flight RPC to retrieve
+   * a new authz token for the given table, add the 'rpc' to the collection of
+   * RPCs to be retried once the retrieval completes.
+   *
+   * @param rpc the RPC that needs a new authz token
+   * @param ex error that caused triggered this retrieval
+   * @param <R> the RPC type
+   */
+  <R> void retrieveAuthzToken(@Nonnull final KuduRpc<R> rpc, @Nonnull final KuduException ex) {
+    /**
+     * Handles a response from getting an authz token.
+     */
+    final class NewAuthzTokenCB implements Callback<Void, GetTableSchemaResponse> {
+      private final String tableId;
+
+      public NewAuthzTokenCB(String tableId) {
+        this.tableId = tableId;
+      }
+
+      @Override
+      public Void call(@Nonnull GetTableSchemaResponse resp) throws Exception {
+        if (resp.getAuthzToken() == null) {
+          // Note: If we were talking to an old master, we would hit an
+          // exception earlier in the RPC handling.
+          throw new NonRecoverableException(
+              Status.InvalidArgument("no authz token retrieved for " + tableId));
+        }
+        LOG.debug("retrieved authz token for " + tableId);
+        put(tableId, resp.getAuthzToken());
+        for (RpcAndException rpcAndEx : clearPendingRetries(tableId)) {
+          client.handleRetryableErrorNoDelay(rpcAndEx.rpc, rpcAndEx.ex);
+        }
+        return null;
+      }
+    }
+
+    /**
+     * Handles the case where there was an error getting the new authz token.
+     */
+    final class NewAuthzTokenErrB implements Callback<Void, Exception> {
+      private KuduRpc<?> parentRpc;
+      private final NewAuthzTokenCB cb;
+
+      public NewAuthzTokenErrB(@Nonnull NewAuthzTokenCB cb, @Nonnull KuduRpc<?> parentRpc) {
+        this.cb = cb;
+        this.parentRpc = parentRpc;
+      }
+
+      @Override
+      public Void call(@Nonnull Exception e) {
+        String tableId = cb.tableId;
+        if (e instanceof RecoverableException) {
+          sendRetrievalForRpc(parentRpc, cb, this);
+        } else {
+          for (RpcAndException rpcAndEx : clearPendingRetries(tableId)) {
+            rpcAndEx.rpc.errback(e);
+          }
+        }
+        return null;
+      }
+    }
+
+    final String tableId = rpc.getTable().getTableId();
+    RpcAndException rpcAndEx = new RpcAndException(rpc, ex);
+    synchronized (retriesLock) {
+      List<RpcAndException> pendingRetries = retriesForTable.putIfAbsent(
+          tableId, new ArrayList<>(Arrays.asList(rpcAndEx)));
+      if (pendingRetries == null) {
+        // There isn't an in-flight RPC to retrieve a new authz token.
+        NewAuthzTokenCB newTokenCB = new NewAuthzTokenCB(tableId);
+        NewAuthzTokenErrB newTokenErrB = new NewAuthzTokenErrB(newTokenCB, rpc);
+        sendRetrievalForRpc(rpc, newTokenCB, newTokenErrB);
+      } else {
+        Preconditions.checkState(!pendingRetries.isEmpty(),
+            "no pending retries for table " + tableId);
+        pendingRetries.add(rpcAndEx);
+      }
+    }
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index b08e246..9685923 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
@@ -49,6 +50,9 @@ class Batch extends KuduRpc<BatchResponse> {
   /** The tablet this batch will be routed to. */
   private final LocatedTablet tablet;
 
+  /** The token with which to authorize this RPC. */
+  private Token.SignedTokenPB authzToken;
+
   /**
    * This size will be set when serialize is called. It stands for the size of rows in all
    * operations in this batch.
@@ -105,6 +109,16 @@ class Batch extends KuduRpc<BatchResponse> {
   }
 
   @Override
+  boolean needsAuthzToken() {
+    return true;
+  }
+
+  @Override
+  void bindAuthzToken(Token.SignedTokenPB token) {
+    authzToken = token;
+  }
+
+  @Override
   Message createRequestPB() {
     final Tserver.WriteRequestPB.Builder builder =
         Operation.createAndFillWriteRequestPB(operations);
@@ -112,6 +126,9 @@ class Batch extends KuduRpc<BatchResponse> {
                              (long)builder.getRowOperations().getIndirectData().size();
     builder.setTabletId(UnsafeByteOperations.unsafeWrap(getTablet().getTabletIdAsBytes()));
     builder.setExternalConsistencyMode(externalConsistencyMode.pbVersion());
+    if (authzToken != null) {
+      builder.setAuthzToken(authzToken);
+    }
     return builder.build();
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index ba188f0..5da0a8c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -394,12 +394,18 @@ class Connection extends SimpleChannelUpstreamHandler {
     final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
     KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
     final RpcHeader.ErrorStatusPB error = errorBuilder.build();
-    if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
-        error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
+    RpcHeader.ErrorStatusPB.RpcErrorCodePB code = error.getCode();
+    if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
+        code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
       responseCbk.call(new CallResponseInfo(
           response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
       return;
     }
+    if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_INVALID_AUTHORIZATION_TOKEN)) {
+      responseCbk.call(new CallResponseInfo(
+          response, new InvalidAuthzTokenException(Status.NotAuthorized(error.getMessage()))));
+      return;
+    }
 
     final String message = getLogPrefix() + " server sent error " + error.getMessage();
     LOG.error(message); // can be useful
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index d3e48e6..0e2ec55 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -22,8 +22,10 @@ import static org.apache.kudu.master.Master.GetTableSchemaResponsePB;
 import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
+import org.apache.kudu.master.Master;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
@@ -31,6 +33,9 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master.TableIdentifierPB.Builder;
 import org.apache.kudu.util.Pair;
 
+import java.util.Collection;
+import java.util.List;
+
 /**
  * RPC to fetch a table's schema
  */
@@ -38,18 +43,22 @@ import org.apache.kudu.util.Pair;
 public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
   private final String id;
   private final String name;
-
+  private final List<Integer> requiredFeatures;
 
   GetTableSchemaRequest(KuduTable masterTable,
                         String id,
                         String name,
                         Timer timer,
-                        long timeoutMillis) {
+                        long timeoutMillis,
+                        boolean requiresAuthzTokenSupport) {
     super(masterTable, timer, timeoutMillis);
     Preconditions.checkArgument(id != null ^ name != null,
         "Only one of table ID or the table name should be provided");
     this.id = id;
     this.name = name;
+    this.requiredFeatures = requiresAuthzTokenSupport ?
+        ImmutableList.of(Master.MasterFeatures.GENERATE_AUTHZ_TOKEN_VALUE) :
+        ImmutableList.<Integer>of();
   }
 
   @Override
@@ -89,8 +98,14 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
         schema,
         respBuilder.getTableId().toStringUtf8(),
         respBuilder.getNumReplicas(),
-        ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema));
+        ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema),
+        respBuilder.hasAuthzToken() ? respBuilder.getAuthzToken() : null);
     return new Pair<GetTableSchemaResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return requiredFeatures;
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
index a3d10ab..b018e0a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.kudu.security.Token.SignedTokenPB;
 import org.apache.kudu.Schema;
 
 @InterfaceAudience.Private
@@ -28,6 +29,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   private final PartitionSchema partitionSchema;
   private final String tableId;
   private final int numReplicas;
+  private final SignedTokenPB authzToken;
 
   /**
    * @param elapsedMillis Time in milliseconds since RPC creation to now
@@ -36,18 +38,21 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
    * @param tableId the UUID of the table in the response
    * @param numReplicas the table's replication factor
    * @param partitionSchema the table's partition schema
+   * @param authzToken an authorization token for use with this table
    */
   GetTableSchemaResponse(long elapsedMillis,
                          String tsUUID,
                          Schema schema,
                          String tableId,
                          int numReplicas,
-                         PartitionSchema partitionSchema) {
+                         PartitionSchema partitionSchema,
+                         SignedTokenPB authzToken) {
     super(elapsedMillis, tsUUID);
     this.schema = schema;
     this.partitionSchema = partitionSchema;
     this.tableId = tableId;
     this.numReplicas = numReplicas;
+    this.authzToken = authzToken;
   }
 
   /**
@@ -81,4 +86,12 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   public int getNumReplicas() {
     return numReplicas;
   }
+
+  /**
+   * Get the authorization token for the table.
+   * @return the table's authz token
+   */
+  public SignedTokenPB getAuthzToken() {
+    return authzToken;
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java
new file mode 100644
index 0000000..4adb82f
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java
@@ -0,0 +1,40 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Exception for notifying of an invalid authorization token. In most use cases
+ * in the Kudu Java client code, 'invalid authz token' means 'expired authz
+ * token'. Receiving this exception means the authorization token used to make a
+ * request is no longer valid and a new one is needed to make requests that
+ * access data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class InvalidAuthzTokenException extends RecoverableException {
+  /**
+   * @param status status object containing the reason for the exception trace
+   */
+  InvalidAuthzTokenException(Status status) {
+    super(status);
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index eb33c80..06cce0b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -40,6 +40,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -117,6 +118,18 @@ public abstract class KuduRpc<R> {
   }
 
   /**
+   * Binds the given authorization token to the request.
+   */
+  void bindAuthzToken(Token.SignedTokenPB token) {}
+
+  /**
+   * Whether the request needs to be authorized via authz token.
+   */
+  boolean needsAuthzToken() {
+    return false;
+  }
+
+  /**
    * The Deferred that will be invoked when this RPC completes or fails.
    * In case of a successful completion, this Deferred's first callback
    * will be invoked with an {@link Object} containing the de-serialized
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 38ba205..d1d9748 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.jboss.netty.util.Timer;
@@ -86,6 +87,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
 
   private PartialRow row;
 
+  private Token.SignedTokenPB authzToken;
+
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   boolean ignoreAllDuplicateRows = false;
 
@@ -149,6 +152,16 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
   }
 
   @Override
+  boolean needsAuthzToken() {
+    return true;
+  }
+
+  @Override
+  void bindAuthzToken(Token.SignedTokenPB token) {
+    authzToken = token;
+  }
+
+  @Override
   Message createRequestPB() {
     final Tserver.WriteRequestPB.Builder builder =
         createAndFillWriteRequestPB(ImmutableList.of(this));
@@ -159,6 +172,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
     if (this.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
       builder.setPropagatedTimestamp(this.propagatedTimestamp);
     }
+    if (authzToken != null) {
+      builder.setAuthzToken(authzToken);
+    }
     return builder.build();
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 8419437..606cfd8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -32,6 +32,7 @@ import javax.annotation.Nonnull;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
+import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -181,6 +182,12 @@ class RpcProxy {
             RpcHeader.RemoteMethodPB.newBuilder()
                 .setServiceName(rpc.serviceName())
                 .setMethodName(rpc.method()));
+    // Before we create the request, get an authz token if needed. This is done
+    // regardless of whether the KuduRpc object already has a token; we may be
+    // a retrying due to an invalid token and the client may have a new token.
+    if (rpc.needsAuthzToken()) {
+      rpc.bindAuthzToken(client.getAuthzToken(rpc.getTable().getTableId()));
+    }
     final Message reqPB = rpc.createRequestPB();
     // TODO(wdberkeley): We should enforce that every RPC has a timeout.
     if (rpc.timeoutTracker.hasTimeout()) {
@@ -225,7 +232,11 @@ class RpcProxy {
             connection.getServerInfo());
     if (ex != null) {
       if (ex instanceof InvalidAuthnTokenException) {
-        client.handleInvalidToken(rpc);
+        client.handleInvalidAuthnToken(rpc);
+        return;
+      }
+      if (ex instanceof InvalidAuthzTokenException) {
+        client.handleInvalidAuthzToken(rpc, ex);
         return;
       }
       if (ex instanceof RecoverableException) {
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
similarity index 60%
rename from java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
rename to java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
index 7d332b0..c888bef 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.Schema;
+import org.apache.kudu.security.Token;
 import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.ClientTestUtil;
@@ -43,25 +44,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This test contains scenarios to verify that client re-acquires authn token upon expiration
- * of the current one and automatically retries the call.
+ * This test contains scenarios to verify that clients re-acquire tokens upon
+ * expiration of the current one and automatically retries affected calls.
  */
-public class TestAuthnTokenReacquire {
-  private static final Logger LOG = LoggerFactory.getLogger(TestAuthnTokenReacquire.class);
+public class TestAuthTokenReacquire {
+  private static final Logger LOG = LoggerFactory.getLogger(TestAuthTokenReacquire.class);
 
-  private static final String TABLE_NAME = "TestAuthnTokenReacquire-table";
+  private static final String TABLE_NAME = "TestAuthTokenReacquire-table";
+
+  // Set a low token timeout.
   private static final int TOKEN_TTL_SEC = 1;
   private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000;
 
   private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
 
-  // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the master and tablet
-  // servers, even for not-yet-expired tokens.
+  // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the
+  // master and tablet servers, even for not-yet-expired tokens.
   private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder()
       .enableKerberos()
       .addMasterServerFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC))
+      .addMasterServerFlag(String.format("--authz_token_validity_seconds=%d", TOKEN_TTL_SEC))
       .addMasterServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5")
-      .addTabletServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5");
+      .addTabletServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5")
+      .addTabletServerFlag("--tserver_enforce_access_control=true")
+      .addTabletServerFlag("--tserver_inject_invalid_authz_token_ratio=0.5");
 
   private KuduClient client;
   private AsyncKuduClient asyncClient;
@@ -81,11 +87,19 @@ public class TestAuthnTokenReacquire {
     }
   }
 
-  private void dropConnectionsAndExpireToken() throws InterruptedException {
+  private void dropConnectionsAndExpireTokens() throws InterruptedException {
     // Drop all connections from the client to Kudu servers.
     dropConnections();
-    // Wait for authn token expiration.
-    Thread.sleep(TOKEN_TTL_SEC * 1000);
+    // Wait for token expiration. Since we've just dropped all connections, this
+    // means that we'll need to get a new authn token upon sending the next RPC.
+    expireTokens();
+  }
+
+  private void expireTokens() throws InterruptedException {
+    // Sleep long enough for the authn/authz tokens to expire. Wait for just
+    // past the token TTL to avoid making this test flaky, e.g. in case the
+    // token just misses being considered expired.
+    Thread.sleep((TOKEN_TTL_SEC + 1) * 1000);
   }
 
   @Test
@@ -100,23 +114,23 @@ public class TestAuthnTokenReacquire {
         @Override
         @SuppressWarnings("AssertionFailureIgnored")
         public void run() {
-          final String tableName = "TestAuthnTokenReacquire-table-" + threadIdx;
+          final String tableName = "TestAuthTokenReacquire-table-" + threadIdx;
           try {
             ListTabletServersResponse response = client.listTabletServers();
             assertNotNull(response);
-            dropConnectionsAndExpireToken();
+            dropConnectionsAndExpireTokens();
 
             ListTablesResponse tableList = client.getTablesList(tableName);
             assertNotNull(tableList);
             assertTrue(tableList.getTablesList().isEmpty());
-            dropConnectionsAndExpireToken();
+            dropConnectionsAndExpireTokens();
 
             client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
-            dropConnectionsAndExpireToken();
+            dropConnectionsAndExpireTokens();
 
             KuduTable table = client.openTable(tableName);
             assertEquals(basicSchema.getColumnCount(), table.getSchema().getColumnCount());
-            dropConnectionsAndExpireToken();
+            dropConnectionsAndExpireTokens();
 
             client.deleteTable(tableName);
             assertFalse(client.tableExists(tableName));
@@ -126,7 +140,7 @@ public class TestAuthnTokenReacquire {
           }
         }
       });
-      thread.run();
+      thread.start();
       threads.add(thread);
     }
     for (Thread thread : threads) {
@@ -140,29 +154,65 @@ public class TestAuthnTokenReacquire {
     }
   }
 
+  private int countRowsInTable(KuduTable table) throws Exception {
+    AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, table)
+        .scanRequestTimeout(OP_TIMEOUT_MS)
+        .build();
+    return countRowsInScan(scanner);
+  }
+
+  private void insertRowWithKey(KuduSession session, KuduTable table, int key) throws Exception {
+    session.apply(createBasicSchemaInsert(table, key));
+    session.flush();
+    RowErrorsAndOverflowStatus errors = session.getPendingErrors();
+    assertFalse(errors.isOverflowed());
+    assertEquals(0, session.countPendingErrors());
+  }
+
+
   @Test
   public void testBasicWorkflow() throws Exception {
     KuduTable table = client.createTable(TABLE_NAME, basicSchema,
         getBasicCreateTableOptions());
-    dropConnectionsAndExpireToken();
+    String tableId = table.getTableId();
+    int key = 0;
 
+    // Drop all connections so the first write needs to reconnect with a new authn token.
+    Token.SignedTokenPB originalToken = asyncClient.securityContext.getAuthenticationToken();
+    dropConnectionsAndExpireTokens();
     KuduSession session = client.newSession();
     session.setTimeoutMillis(OP_TIMEOUT_MS);
-    session.apply(createBasicSchemaInsert(table, 1));
-    session.flush();
-    RowErrorsAndOverflowStatus errors = session.getPendingErrors();
-    assertFalse(errors.isOverflowed());
-    assertEquals(0, session.countPendingErrors());
-    dropConnectionsAndExpireToken();
+    insertRowWithKey(session, table, ++key);
 
-    KuduTable scanTable = client.openTable(TABLE_NAME);
-    AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, scanTable)
-        .scanRequestTimeout(OP_TIMEOUT_MS)
-        .build();
-    assertEquals(1, countRowsInScan(scanner));
-    dropConnectionsAndExpireToken();
+    // Verify that we got a different authn token.
+    assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken));
 
+    // Now wait for the authz token to expire and do a write.
+    originalToken = asyncClient.getAuthzToken(tableId);
+    expireTokens();
+    insertRowWithKey(session, table, ++key);
+
+    // Verify that we got a different authz token.
+    assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken));
+
+    // Drop all connections so the first scan needs to reconnect with a new authn token.
+    originalToken = asyncClient.securityContext.getAuthenticationToken();
+    dropConnectionsAndExpireTokens();
+    KuduTable scanTable = client.openTable(TABLE_NAME);
+    assertEquals(key, countRowsInTable(scanTable));
+    assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken));
+
+    // Now wait for the authz token to expire and do a scan.
+    originalToken = asyncClient.getAuthzToken(tableId);
+    expireTokens();
+    assertEquals(key, countRowsInTable(scanTable));
+    assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken));
+
+    // Force the client to get a new authn token and delete the table.
+    originalToken = asyncClient.securityContext.getAuthenticationToken();
+    dropConnectionsAndExpireTokens();
     client.deleteTable(TABLE_NAME);
     assertFalse(client.tableExists(TABLE_NAME));
+    assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken));
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java
new file mode 100644
index 0000000..0b229a9
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.security.Token;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAuthzTokenCache {
+  private static final Logger LOG = LoggerFactory.getLogger(TestAuthzTokenCache.class);
+
+  // This tests basic functionality of the authz token cache (e.g. putting
+  // things in, getting stuff out).
+  private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder =
+      KuduTestHarness.getBaseClusterBuilder()
+          .enableKerberos();
+
+  private static final String tableName = "TestAuthzTokenCache-table";
+
+  private KuduClient client;
+  private AsyncKuduClient asyncClient;
+
+  @Before
+  public void setUp() {
+    client = harness.getClient();
+    asyncClient = harness.getAsyncClient();
+  }
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
+
+  // Retrieves a new authz token from the master (regardless of whether there is
+  // already one in the authz token cache).
+  public void fetchAuthzToken(KuduTable table) throws Exception {
+    // Send a dummy RPC via the token cache. This will run a scan RPC
+    // after retrieving a new authz token.
+    AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, table)
+        .build();
+    KuduRpc<AsyncKuduScanner.Response> req = scanner.getOpenRequest();
+    Deferred<AsyncKuduScanner.Response> d = req.getDeferred();
+    asyncClient.getAuthzTokenCache().retrieveAuthzToken(req,
+        new InvalidAuthzTokenException(Status.IOError("test failure")));
+    assertNotNull(d.join());
+  }
+
+  @Test
+  public void testBasicAuthzTokenCache() throws Exception {
+    // First, do a sanity check that we get an authz token in the first place
+    // upon accessing a table.
+    final KuduTable table = client.createTable(tableName, getBasicSchema(),
+        getBasicCreateTableOptions());
+    AuthzTokenCache tokenCache = asyncClient.getAuthzTokenCache();
+    String tableId = table.getTableId();
+    Token.SignedTokenPB originalToken = asyncClient.getAuthzToken(tableId);
+    assertNotNull(originalToken);
+
+    // Wait a bit so the next token we get will be different. A different token
+    // will be generated every second by virtue of having a different
+    // expiration, which is in seconds.
+    Thread.sleep(1100);
+
+    // Send a dummy RPC via the token cache, sending it only after getting a new
+    // authz token.
+    fetchAuthzToken(table);
+
+    // Verify we actually got a new authz token.
+    assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken));
+
+    // Now put the original token directly in the cache.
+    tokenCache.put(tableId, originalToken);
+    assertTrue(asyncClient.getAuthzToken(tableId).equals(originalToken));
+  }
+
+  @Test
+  public void testRetrieveAuthzTokensInParallel() throws Exception {
+    final KuduTable table = client.createTable(tableName, getBasicSchema(),
+        getBasicCreateTableOptions());
+    final String tableId = table.getTableId();
+    class AuthzTokenFetcher implements Callable<Exception> {
+      @Override
+      public Exception call() {
+        try {
+          fetchAuthzToken(table);
+        } catch (Exception e) {
+          return e;
+        }
+        return null;
+      }
+    }
+    // Send a bunch of authz token requests in parallel.
+    final int NUM_THREADS = 30;
+    ArrayList<AuthzTokenFetcher> fetchers = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      fetchers.add(new AuthzTokenFetcher());
+    }
+    int fails = 0;
+    final ExecutorService pool = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future<Exception>> exceptions = pool.invokeAll(fetchers);
+    pool.shutdown();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Exception e = exceptions.get(i).get();
+      if (e != null) {
+        fails++;
+        e.printStackTrace();
+      }
+    }
+    assertEquals(0, fails);
+    // We should have gotten a token with all those retrievals, and sent a
+    // number of RPCs that was lower than the number of threads.
+    assertNotNull(asyncClient.getAuthzToken(tableId));
+    int numRetrievals = asyncClient.getAuthzTokenCache().numRetrievalsSent();
+    LOG.debug(String.format("Sent %d RPCs for %d threads", numRetrievals, NUM_THREADS));
+    assertTrue(0 < numRetrievals);
+    assertTrue(numRetrievals < NUM_THREADS);
+  }
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java
new file mode 100644
index 0000000..931692a
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
+import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
+import static org.apache.kudu.test.ClientTestUtil.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestMultiMasterAuthzTokens {
+  private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder =
+      KuduTestHarness.getBaseClusterBuilder()
+          .addMasterServerFlag("--authz_token_validity_seconds=1")
+          .addTabletServerFlag("--tserver_enforce_access_control=true")
+          // Inject invalid tokens such that operations will be forced to go
+          // back to the master for an authz token.
+          .addTabletServerFlag("--tserver_inject_invalid_authz_token_ratio=0.5");
+
+  private static final String tableName = "TestMultiMasterAuthzToken-table";
+
+  private KuduClient client;
+
+  @Before
+  public void setUp() {
+    client = harness.getClient();
+  }
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
+
+  /**
+   * Utility to send RPCs to add rows given the specified flush mode.
+   * Inserts rows with keys [startRow, endRow).
+   */
+  private void insertRows(KuduTable table, SessionConfiguration.FlushMode mode,
+                          int startRow, int endRow) throws Exception {
+    KuduSession session = client.newSession();
+    session.setFlushMode(mode);
+    for (int i = startRow; i < endRow; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+    session.flush();
+  }
+
+  /**
+   * Utility to send RPCs to add rows given the specified flush mode.
+   * Upserts rows with keys [startRow, endRow).
+   */
+  private void upsertRows(KuduTable table, SessionConfiguration.FlushMode mode,
+                          int startRow, int endRow) throws Exception {
+    KuduSession session = client.newSession();
+    session.setFlushMode(mode);
+    for (int i = startRow; i < endRow; i++) {
+      Upsert upsert = createBasicSchemaUpsert(table, i);
+      session.apply(upsert);
+    }
+    session.flush();
+  }
+
+  @Test
+  public void testAuthzTokensDuringElection() throws Exception {
+    // Test sending various requests that require authorization.
+    final KuduTable table = client.createTable(tableName, getBasicSchema(),
+        getBasicCreateTableOptions().setNumReplicas(1));
+
+    // Restart the masters to trigger an election.
+    harness.killAllMasterServers();
+    harness.startAllMasterServers();
+
+    final int NUM_REQS = 10;
+    insertRows(table, AUTO_FLUSH_SYNC, 0, NUM_REQS);
+
+    // Do the same for batches of inserts.
+    harness.killAllMasterServers();
+    harness.startAllMasterServers();
+    insertRows(table, AUTO_FLUSH_BACKGROUND, NUM_REQS, 2 * NUM_REQS);
+
+    // And for scans.
+    harness.killAllMasterServers();
+    harness.startAllMasterServers();
+    for (int i = 0; i < NUM_REQS; i++) {
+      assertEquals(2 * NUM_REQS, countRowsInTable(table));
+    }
+  }
+
+  @Test
+  public void testAuthzTokenExpiration() throws Exception {
+    // Test a long-running concurrent workload with different types of requests
+    // being sent, all the while injecting invalid tokens, with a short authz
+    // token expiration time. The threads should reacquire tokens as needed
+    // without surfacing token errors to the client.
+    final int TEST_RUNTIME_MS = 30000;
+    final KuduTable table = client.createTable(tableName, getBasicSchema(),
+        getBasicCreateTableOptions().setNumReplicas(1));
+    final CountDownLatch latch = new CountDownLatch(1);
+    final ExecutorService pool = Executors.newFixedThreadPool(3);
+    List<Future<Exception>> exceptions = new ArrayList<>();
+    exceptions.add(pool.submit(new Callable<Exception>() {
+      @Override
+      public Exception call() throws Exception {
+        try {
+          int batch = 0;
+          while (latch.getCount() > 0) {
+            // Send writes without batching.
+            upsertRows(table, AUTO_FLUSH_SYNC, batch * 10, (++batch) * 10);
+          }
+        } catch (Exception e) {
+          return e;
+        }
+        return null;
+      }
+    }));
+    exceptions.add(pool.submit(new Callable<Exception>() {
+      @Override
+      public Exception call() throws Exception {
+        try {
+          int batch = 0;
+          while (latch.getCount() > 0) {
+            // Also send writes with batching.
+            upsertRows(table, AUTO_FLUSH_BACKGROUND, batch * 10, (++batch) * 10);
+          }
+        } catch (Exception e) {
+          return e;
+        }
+        return null;
+      }
+    }));
+    exceptions.add(pool.submit(new Callable<Exception>() {
+      @Override
+      public Exception call() throws Exception {
+        try {
+          while (latch.getCount() > 0) {
+            // We can't guarantee a row count, but catch any exceptions.
+            countRowsInTable(table);
+          }
+        } catch (Exception e) {
+          return e;
+        }
+        return null;
+      }
+    }));
+    Thread.sleep(TEST_RUNTIME_MS);
+    latch.countDown();
+    int fails = 0;
+    for (Future<Exception> future : exceptions) {
+      Exception e = future.get();
+      if (e != null) {
+        e.printStackTrace();
+        fails++;
+      }
+    }
+    assertEquals(0, fails);
+  }
+}
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index 9b67a9b..a93ee83 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -41,6 +41,7 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.client.Upsert;
 import org.apache.kudu.util.DecimalUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -290,6 +291,17 @@ public abstract class ClientTestUtil {
     session.close();
   }
 
+  public static Upsert createBasicSchemaUpsert(KuduTable table, int key) {
+    Upsert upsert = table.newUpsert();
+    PartialRow row = upsert.getRow();
+    row.addInt(0, key);
+    row.addInt(1, 3);
+    row.addInt(2, 4);
+    row.addString(3, "another string");
+    row.addBoolean(4, false);
+    return upsert;
+  }
+
   public static Insert createBasicSchemaInsert(KuduTable table, int key) {
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();


Mime
View raw message