hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [hbase] branch branch-2 updated: HBASE-23347 Allow custom authentication methods for RPCs
Date Thu, 16 Jan 2020 16:13:44 GMT
This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new bef1eb3  HBASE-23347 Allow custom authentication methods for RPCs
bef1eb3 is described below

commit bef1eb33f41652eb699e8d608af5e67a7127605b
Author: Josh Elser <elserj@apache.org>
AuthorDate: Thu Jan 16 10:27:28 2020 -0500

    HBASE-23347 Allow custom authentication methods for RPCs
    
    Decouple the HBase internals such that someone can implement
    their own SASL-based authentication mechanism and plug it into
    HBase RegionServers/Masters.
    
    Comes with a design doc in dev-support/design-docs and an example in
    hbase-examples known as "Shade" which uses a flat-password file
    for authenticating users.
    
    Closes #884
    
    Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
    Signed-off-by: Andrew Purtell <apurtell@apache.org>
    Signed-off-by: Reid Chan <reidchan@apache.org>
---
 .../HBASE-23347-pluggable-authentication.md        | 179 +++++++
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  14 -
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    |  85 ++--
 .../hadoop/hbase/ipc/NettyRpcConnection.java       |  10 +-
 .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 157 ++----
 .../hbase/security/AbstractHBaseSaslRpcClient.java | 139 +-----
 .../hadoop/hbase/security/HBaseSaslRpcClient.java  |  24 +-
 .../hbase/security/NettyHBaseSaslRpcClient.java    |  14 +-
 .../security/NettyHBaseSaslRpcClientHandler.java   |  12 +-
 .../org/apache/hadoop/hbase/security/SaslUtil.java |   6 +-
 .../AbstractSaslClientAuthenticationProvider.java  |  59 +++
 .../provider/AuthenticationProviderSelector.java   |  49 ++
 .../security/provider/BuiltInProviderSelector.java | 134 +++++
 .../BuiltInSaslAuthenticationProvider.java         |  39 ++
 .../provider/DigestSaslAuthenticationProvider.java |  36 ++
 .../DigestSaslClientAuthenticationProvider.java    | 107 ++++
 .../provider/GssSaslAuthenticationProvider.java    |  36 ++
 .../GssSaslClientAuthenticationProvider.java       | 107 ++++
 .../hbase/security/provider/SaslAuthMethod.java    |  96 ++++
 .../provider/SaslAuthenticationProvider.java       |  46 ++
 .../provider/SaslClientAuthenticationProvider.java |  93 ++++
 .../SaslClientAuthenticationProviders.java         | 220 +++++++++
 .../provider/SimpleSaslAuthenticationProvider.java |  35 ++
 .../SimpleSaslClientAuthenticationProvider.java    |  58 +++
 ...urity.provider.SaslClientAuthenticationProvider |  18 +
 .../hbase/security/TestHBaseSaslRpcClient.java     |  92 ++--
 .../provider/TestDefaultProviderSelector.java      |  83 ++++
 .../TestSaslClientAuthenticationProviders.java     | 139 ++++++
 .../hadoop/hbase/HBaseInterfaceAudience.java       |   5 +
 hbase-examples/pom.xml                             |  22 +
 .../security/provider/example/SaslPlainServer.java | 164 +++++++
 .../provider/example/ShadeClientTokenUtil.java     |  44 ++
 .../provider/example/ShadeProviderSelector.java    |  66 +++
 .../example/ShadeSaslAuthenticationProvider.java   |  38 ++
 .../ShadeSaslClientAuthenticationProvider.java     | 108 +++++
 .../ShadeSaslServerAuthenticationProvider.java     | 193 ++++++++
 .../provider/example/ShadeTokenIdentifier.java     |  66 +++
 ...rg.apache.hadoop.security.token.TokenIdentifier |  18 +
 .../TestShadeSaslAuthenticationProvider.java       | 245 ++++++++++
 hbase-examples/src/test/resources/log4j.properties |   1 +
 hbase-server/pom.xml                               |   6 +
 .../hadoop/hbase/ipc/ServerRpcConnection.java      |  80 ++-
 .../hadoop/hbase/security/HBaseSaslRpcServer.java  | 166 +------
 .../AttemptingUserProvidingSaslServer.java         |  53 ++
 .../DigestSaslServerAuthenticationProvider.java    | 158 ++++++
 .../GssSaslServerAuthenticationProvider.java       | 115 +++++
 .../provider/SaslServerAuthenticationProvider.java |  54 +++
 .../SaslServerAuthenticationProviders.java         | 191 ++++++++
 .../SimpleSaslServerAuthenticationProvider.java    |  51 ++
 ...urity.provider.SaslServerAuthenticationProvider |  19 +
 .../TestCustomSaslAuthenticationProvider.java      | 539 +++++++++++++++++++++
 .../TestSaslServerAuthenticationProviders.java     | 149 ++++++
 hbase-server/src/test/resources/log4j.properties   |   1 +
 53 files changed, 4102 insertions(+), 537 deletions(-)

diff --git a/dev-support/design-docs/HBASE-23347-pluggable-authentication.md b/dev-support/design-docs/HBASE-23347-pluggable-authentication.md
new file mode 100644
index 0000000..f6c7203
--- /dev/null
+++ b/dev-support/design-docs/HBASE-23347-pluggable-authentication.md
@@ -0,0 +1,179 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Pluggable Authentication for HBase RPCs
+
+## Background
+
+As a distributed database, HBase must be able to authenticate users and HBase
+services across an untrusted network. Clients and HBase services are treated
+equivalently in terms of authentication (and this is the only time we will
+draw such a distinction).
+
+There are currently three modes of authentication which are supported by HBase
+today via the configuration property `hbase.security.authentication`
+
+1. `SIMPLE`
+2. `KERBEROS`
+3. `TOKEN`
+
+`SIMPLE` authentication is effectively no authentication; HBase assumes the user
+is who they claim to be. `KERBEROS` authenticates clients via the KerberosV5
+protocol using the GSSAPI mechanism of the Java Simple Authentication and Security
+Layer (SASL) protocol. `TOKEN` is a username-password based authentication protocol
+which uses short-lived passwords that can only be obtained via a `KERBEROS` authenticated
+request. `TOKEN` authentication is synonymous with Hadoop-style [Delegation Tokens](https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/hadoop_tokens.html#delegation-tokens). `TOKEN` authentication uses the `DIGEST-MD5`
+SASL mechanism.
+
+[SASL](https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html)
+is a library which specifies a network protocol that can authenticate a client
+and a server using an arbitrary mechanism. SASL ships with a [number of mechanisms](https://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml)
+out of the box and it is possible to implement custom mechanisms. SASL is effectively
+decoupling an RPC client-server model from the mechanism used to authenticate those
+requests (e.g. the RPC code is identical whether username-password, Kerberos, or any
+other method is used to authenticate the request).
+
+RFC's define what [SASL mechanisms exist](https://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xml),
+but what RFC's define are a superset of the mechanisms that are
+[implemented in Java](https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html#SUN).
+This document limits discussion to SASL mechanisms in the abstract, focusing on those which are well-defined and
+implemented in Java today by the JDK itself. However, it is completely possible that a developer can implement
+and register their own SASL mechanism. Writing a custom mechanism is outside of the scope of this document, but
+not outside of the realm of possibility.
+
+The `SIMPLE` implementation does not use SASL, but instead has its own RPC logic
+built into the HBase RPC protocol. `KERBEROS` and `TOKEN` both use SASL to authenticate,
+relying on the `Token` interface that is intertwined with the Hadoop `UserGroupInformation`
+class. SASL decouples an RPC from the mechanism used to authenticate that request.
+
+## Problem statement
+
+Despite HBase already shipping authentication implementations which leverage SASL,
+it is (effectively) impossible to add a new authentication implementation to HBase. The
+use of the `org.apache.hadoop.hbase.security.AuthMethod` enum makes it impossible
+to define a new method of authentication. Also, the RPC implementation is written
+to only use the methods that are expressly shipped in HBase. Adding a new authentication
+method would require copying and modifying the RpcClient implementation, in addition
+to modifying the RpcServer to invoke the correct authentication check.
+
+While it is possible to add a new authentication method to HBase, it cannot be done
+cleanly or sustainably. This is what is meant by "impossible".
+
+## Proposal
+
+HBase should expose interfaces which allow for pluggable authentication mechanisms
+such that HBase can authenticate against external systems. Because the RPC implementation
+can already support SASL, HBase can standardize on SASL, allowing any authentication method
+which is capable of using SASL to negotiate authentication. `KERBEROS` and `TOKEN` methods
+will naturally fit into these new interfaces, but `SIMPLE` authentication will not (see the following
+chapter for a tangent on SIMPLE authentication today)
+
+### Tangent: on SIMPLE authentication
+
+`SIMPLE` authentication in HBase today is treated as a special case. My impression is that
+this stems from HBase not originally shipping an RPC solution that had any authentication.
+
+Re-implementing `SIMPLE` authentication such that it also flows through SASL (e.g. via
+the `PLAIN` SASL mechanism) would simplify the HBase codebase such that all authentication
+occurs via SASL. This was not done for the initial implementation to reduce the scope
+of the changeset. Changing `SIMPLE` authentication to use SASL may result in some
+performance impact in setting up a new RPC. The same conditional logic to determine
+`if (sasl) ... else SIMPLE` logic is propagated in this implementation.
+
+## Implementation Overview
+
+HBASE-23347 includes a refactoring of HBase RPC authentication where all current methods
+are ported to a new set of interfaces, and all RPC implementations are updated to use
+the new interfaces. In the spirit of SASL, the expectation is that users can provide
+their own authentication methods at runtime, and HBase should be capable of negotiating
+a client who tries to authenticate via that custom authentication method. The implementation
+refers to this "bundle" of client and server logic as an "authentication provider".
+
+### Providers
+
+One authentication provider includes the following pieces:
+
+1. Client-side logic (providing a credential)
+2. Server-side logic (validating a credential from a client)
+3. Client selection logic to choose a provider (from many that may be available)
+
+A provider's client and server side logic are considered to be one-to-one. A `Foo` client-side provider
+should never be used to authenticate against a `Bar` server-side provider.
+
+We do expect that both clients and servers will have access to multiple providers. A server may
+be capable of authenticating via methods which a client is unaware of. A client may attempt to authenticate
+against a server which the server does not know how to process. In both cases, the RPC
+should fail when a client and server do not have matching providers. The server identifies
+client authentication mechanisms via a `byte authCode` (which is already sent today with HBase RPCs).
+
+A client may also have multiple providers available for it to use in authenticating against
+HBase. The client must have some logic to select which provider to use. Because we are
+allowing custom providers, we must also allow a custom selection logic such that the
+correct provider can be chosen. This is a formalization of the logic already present
+in `org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector`.
+
+To enable the above, we have some new interfaces to support the user extensibility:
+
+1. `interface SaslAuthenticationProvider`
+2. `interface SaslClientAuthenticationProvider extends SaslAuthenticationProvider`
+3. `interface SaslServerAuthenticationProvider extends SaslAuthenticationProvider`
+4. `interface AuthenticationProviderSelector`
+
+The `SaslAuthenticationProvider` shares logic which is common to the client and the
+server (though, this is up to the developer to guarantee this). The client and server
+interfaces each have logic specific to the HBase RPC client and HBase RPC server
+codebase, as their name implies. As described above, an implementation
+of one `SaslClientAuthenticationProvider` must match exactly one implementation of
+`SaslServerAuthenticationProvider`. Each Authentication Provider implementation is
+a singleton and is intended to be shared across all RPCs. A provider selector is
+chosen per client based on that client's configuration.
+
+A client authentication provider is uniquely identified among other providers
+by the following characteristics:
+
+1. A name, e.g. "KERBEROS", "TOKEN"
+2. A byte (a value between 0 and 255)
+
+In addition to these attributes, a provider also must define the following attributes:
+
+3. The SASL mechanism being used.
+4. The Hadoop AuthenticationMethod, e.g. "TOKEN", "KERBEROS", "CERTIFICATE"
+5. The Token "kind", the name used to identify a TokenIdentifier, e.g. `HBASE_AUTH_TOKEN`
+
+It is allowed (even expected) that there may be multiple providers that use `TOKEN` authentication.
+
+N.b. Hadoop requires all `TokenIdentifier` implements to have a no-args constructor and a `ServiceLoader`
+entry in their packaging JAR file (e.g. `META-INF/services/org.apache.hadoop.security.token.TokenIdentifier`).
+Otherwise, parsing the `TokenIdentifier` on the server-side end of an RPC from a Hadoop `Token` will return
+`null` to the caller (often, in the `CallbackHandler` implementation).
+
+### Factories
+
+To ease development with these unknown set of providers, there are two classes which
+find, instantiate, and cache the provider singletons.
+
+1. Client side: `class SaslClientAuthenticationProviders`
+2. Server side: `class SaslServerAuthenticationProviders`
+
+These classes use [Java ServiceLoader](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html)
+to find implementations available on the classpath. The provided HBase implementations
+for the three out-of-the-box implementations all register themselves via the `ServiceLoader`.
+
+Each class also enables providers to be added via explicit configuration in hbase-site.xml.
+This enables unit tests to define custom implementations that may be toy/naive/unsafe without
+any worry that these may be inadvertently deployed onto a production HBase cluster.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 629efe6..d418720 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -41,8 +41,6 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -58,17 +56,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 
@@ -104,14 +98,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
   private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
       .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
-      justification="the rest of the system which live in the different package can use")
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
-
-  static {
-    TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
-  }
-
   protected boolean running = true; // if client runs
 
   protected final Configuration conf;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index c5b1e57..99708e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -361,9 +362,10 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
 
   private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
       throws IOException {
-    saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
-        this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
-          QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
+    saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
+        serverAddress, securityInfo, this.rpcClient.fallbackAllowed,
+        this.rpcClient.conf.get("hbase.rpc.protection",
+            QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
         this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
     return saslRpcClient.saslConnect(in2, out2);
   }
@@ -375,11 +377,10 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
    * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
    * attempted.
    * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
-   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
-   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
-   * underlying authentication implementation, so there is no retry from other high level (for eg,
-   * HCM or HBaseAdmin).
+   * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()}
+   * method. Some providers have the ability to obtain new credentials and then re-attempt to
+   * authenticate with HBase services. Other providers will continue to fail if they failed the
+   * first time -- for those, we want to fail-fast.
    * </p>
    */
   private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
@@ -389,40 +390,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
     user.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws IOException, InterruptedException {
-        if (shouldAuthenticateOverKrb()) {
-          if (currRetries < maxRetries) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Exception encountered while connecting to " +
-                "the server : " + StringUtils.stringifyException(ex));
-            }
-            // try re-login
-            relogin();
-            disposeSasl();
-            // have granularity of milliseconds
-            // we are sleeping with the Connection lock held but since this
-            // connection instance is being used for connecting to the server
-            // in question, it is okay
-            Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
-            return null;
-          } else {
-            String msg = "Couldn't setup connection for "
-                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
-            LOG.warn(msg, ex);
-            throw new IOException(msg, ex);
+        // A provider which failed authentication, but doesn't have the ability to relogin with
+        // some external system (e.g. username/password, the password either works or it doesn't)
+        if (!provider.canRetry()) {
+          LOG.warn("Exception encountered while connecting to the server : " + ex);
+          if (ex instanceof RemoteException) {
+            throw (RemoteException) ex;
           }
-        } else {
-          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
-        }
-        if (ex instanceof RemoteException) {
-          throw (RemoteException) ex;
+          if (ex instanceof SaslException) {
+            String msg = "SASL authentication failed."
+                + " The most likely cause is missing or invalid credentials.";
+            throw new RuntimeException(msg, ex);
+          }
+          throw new IOException(ex);
         }
-        if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed."
-              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
-          LOG.error(HBaseMarkers.FATAL, msg, ex);
-          throw new RuntimeException(msg, ex);
+
+        // Other providers, like kerberos, could request a new ticket from a keytab. Let
+        // them try again.
+        if (currRetries < maxRetries) {
+          LOG.debug("Exception encountered while connecting to the server", ex);
+
+          // Invoke the provider to perform the relogin
+          provider.relogin();
+
+          // Get rid of any old state on the SaslClient
+          disposeSasl();
+
+          // have granularity of milliseconds
+          // we are sleeping with the Connection lock held but since this
+          // connection instance is being used for connecting to the server
+          // in question, it is okay
+          Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
+          return null;
+        } else {
+          String msg = "Failed to initiate connection for "
+              + UserGroupInformation.getLoginUser().getUserName() + " to "
+              + securityInfo.getServerPrincipal();
+          throw new IOException(msg, ex);
         }
-        throw new IOException(ex);
       }
     });
   }
@@ -459,7 +464,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
         if (useSasl) {
           final InputStream in2 = inStream;
           final OutputStream out2 = outStream;
-          UserGroupInformation ticket = getUGI();
+          UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
           boolean continueSasl;
           if (ticket == null) {
             throw new FatalConnectionException("ticket/user is null");
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index b8620b1..7d91fd9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -159,8 +159,8 @@ class NettyRpcConnection extends RpcConnection {
         @Override
         public void run() {
           try {
-            if (shouldAuthenticateOverKrb()) {
-              relogin();
+            if (provider.canRetry()) {
+              provider.relogin();
             }
           } catch (IOException e) {
             LOG.warn("Relogin failed", e);
@@ -183,7 +183,7 @@ class NettyRpcConnection extends RpcConnection {
   }
 
   private void saslNegotiate(final Channel ch) {
-    UserGroupInformation ticket = getUGI();
+    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
     if (ticket == null) {
       failInit(ch, new FatalConnectionException("ticket/user is null"));
       return;
@@ -191,8 +191,8 @@ class NettyRpcConnection extends RpcConnection {
     Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
     final NettyHBaseSaslRpcClientHandler saslHandler;
     try {
-      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
-          serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf);
+      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
+          serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
     } catch (IOException e) {
       failInit(ch, e);
       return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 1fffebb..195a16d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -17,34 +17,33 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-import org.apache.hbase.thirdparty.io.netty.util.Timeout;
-import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
-
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 
 /**
  * Base class for ipc connection.
@@ -56,13 +55,13 @@ abstract class RpcConnection {
 
   protected final ConnectionId remoteId;
 
-  protected final AuthMethod authMethod;
-
   protected final boolean useSasl;
 
   protected final Token<? extends TokenIdentifier> token;
 
-  protected final String serverPrincipal; // server's krb5 principal name
+  protected final InetAddress serverAddress;
+
+  protected final SecurityInfo securityInfo;
 
   protected final int reloginMaxBackoff; // max pause before relogin on sasl failure
 
@@ -81,115 +80,53 @@ abstract class RpcConnection {
   // the last time we were picked up from connection pool.
   protected long lastTouched;
 
+  protected SaslClientAuthenticationProvider provider;
+
   protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId,
       String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor)
       throws IOException {
     if (remoteId.getAddress().isUnresolved()) {
       throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
     }
+    this.serverAddress = remoteId.getAddress().getAddress();
     this.timeoutTimer = timeoutTimer;
     this.codec = codec;
     this.compressor = compressor;
     this.conf = conf;
 
-    UserGroupInformation ticket = remoteId.getTicket().getUGI();
-    SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
+    User ticket = remoteId.getTicket();
+    this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
     this.useSasl = isSecurityEnabled;
-    Token<? extends TokenIdentifier> token = null;
-    String serverPrincipal = null;
+
+    // Choose the correct Token and AuthenticationProvider for this client to use
+    SaslClientAuthenticationProviders providers =
+        SaslClientAuthenticationProviders.getInstance(conf);
+    Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair;
     if (useSasl && securityInfo != null) {
-      AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
-      if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = AbstractRpcClient.TOKEN_HANDLERS
-            .get(tokenKind);
-        if (tokenSelector != null) {
-          token = tokenSelector.selectToken(new Text(clusterId), ticket.getTokens());
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("No token selector found for type " + tokenKind);
+      pair = providers.selectProvider(clusterId, ticket);
+      if (pair == null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Found no valid authentication method from providers={} with tokens={}",
+              providers.toString(), ticket.getTokens());
         }
+        throw new RuntimeException("Found no valid authentication method from options");
       }
-      String serverKey = securityInfo.getServerPrincipal();
-      if (serverKey == null) {
-        throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
-      }
-      serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey),
-        remoteId.address.getAddress().getCanonicalHostName().toLowerCase());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName()
-            + " is " + serverPrincipal);
-      }
-    }
-    this.token = token;
-    this.serverPrincipal = serverPrincipal;
-    if (!useSasl) {
-      authMethod = AuthMethod.SIMPLE;
-    } else if (token != null) {
-      authMethod = AuthMethod.DIGEST;
+    } else if (!useSasl) {
+      // Hack, while SIMPLE doesn't go via SASL.
+      pair = providers.getSimpleProvider();
     } else {
-      authMethod = AuthMethod.KERBEROS;
+      throw new RuntimeException("Could not compute valid client authentication provider");
     }
 
-    // Log if debug AND non-default auth, else if trace enabled.
-    // No point logging obvious.
-    if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) ||
-        LOG.isTraceEnabled()) {
-      // Only log if not default auth.
-      LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName
-          + ", sasl=" + useSasl);
-    }
+    this.provider = pair.getFirst();
+    this.token = pair.getSecond();
+
+    LOG.debug("Using {} authentication for service={}, sasl={}",
+        provider.getSaslAuthMethod().getName(), remoteId.serviceName, useSasl);
     reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
     this.remoteId = remoteId;
   }
 
-  private UserInformation getUserInfo(UserGroupInformation ugi) {
-    if (ugi == null || authMethod == AuthMethod.DIGEST) {
-      // Don't send user for token auth
-      return null;
-    }
-    UserInformation.Builder userInfoPB = UserInformation.newBuilder();
-    if (authMethod == AuthMethod.KERBEROS) {
-      // Send effective user for Kerberos auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-    } else if (authMethod == AuthMethod.SIMPLE) {
-      // Send both effective user and real user for simple auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-      if (ugi.getRealUser() != null) {
-        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-      }
-    }
-    return userInfoPB.build();
-  }
-
-  protected UserGroupInformation getUGI() {
-    UserGroupInformation ticket = remoteId.getTicket().getUGI();
-    if (authMethod == AuthMethod.KERBEROS) {
-      if (ticket != null && ticket.getRealUser() != null) {
-        ticket = ticket.getRealUser();
-      }
-    }
-    return ticket;
-  }
-
-  protected boolean shouldAuthenticateOverKrb() throws IOException {
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
-    // Make sure user logged in using Kerberos either keytab or TGT
-        loginUser.hasKerberosCredentials() &&
-        // relogin only in case it is the login user (e.g. JT)
-        // or superuser (like oozie).
-        (loginUser.equals(currentUser) || loginUser.equals(realUser));
-  }
-
-  protected void relogin() throws IOException {
-    if (UserGroupInformation.isLoginKeytabBased()) {
-      UserGroupInformation.getLoginUser().reloginFromKeytab();
-    } else {
-      UserGroupInformation.getLoginUser().reloginFromTicketCache();
-    }
-  }
-
   protected void scheduleTimeoutTask(final Call call) {
     if (call.timeout > 0) {
       call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
@@ -216,16 +153,16 @@ abstract class RpcConnection {
     System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
     preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
     synchronized (this) {
-      preamble[rpcHeaderLen + 1] = authMethod.code;
+      preamble[rpcHeaderLen + 1] = provider.getSaslAuthMethod().getCode();
     }
     return preamble;
   }
 
   protected ConnectionHeader getConnectionHeader() {
-    ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
+    final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
     builder.setServiceName(remoteId.getServiceName());
-    UserInformation userInfoPB;
-    if ((userInfoPB = getUserInfo(remoteId.ticket.getUGI())) != null) {
+    final UserInformation userInfoPB  = provider.getUserInfo(remoteId.ticket);
+    if (userInfoPB != null) {
       builder.setUserInfo(userInfoPB);
     }
     if (this.codec != null) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
index 94e5d5d..b1f0861 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
@@ -17,27 +17,18 @@
  */
 package org.apache.hadoop.hbase.security;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Map;
 
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A utility class that encapsulates SASL logic for RPC client. Copied from
@@ -46,9 +37,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
  */
 @InterfaceAudience.Private
 public abstract class AbstractHBaseSaslRpcClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseSaslRpcClient.class);
-
   private static final byte[] EMPTY_TOKEN = new byte[0];
 
   protected final SaslClient saslClient;
@@ -59,73 +47,47 @@ public abstract class AbstractHBaseSaslRpcClient {
 
   /**
    * Create a HBaseSaslRpcClient for an authentication method
-   * @param method the requested authentication method
+   * @param conf the configuration object
+   * @param provider the authentication provider
    * @param token token to use if needed by the authentication method
-   * @param serverPrincipal the server principal that we are trying to set the connection up to
+   * @param serverAddr the address of the hbase service
+   * @param securityInfo the security details for the remote hbase service
    * @param fallbackAllowed does the client allow fallback to simple authentication
    * @throws IOException
    */
-  protected AbstractHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed) throws IOException {
-    this(method, token, serverPrincipal, fallbackAllowed, "authentication");
+  protected AbstractHBaseSaslRpcClient(Configuration conf,
+      SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
+      InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed)
+          throws IOException {
+    this(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, "authentication");
   }
 
   /**
    * Create a HBaseSaslRpcClient for an authentication method
-   * @param method the requested authentication method
+   * @param conf configuration object
+   * @param provider the authentication provider
    * @param token token to use if needed by the authentication method
-   * @param serverPrincipal the server principal that we are trying to set the connection up to
+   * @param serverAddr the address of the hbase service
+   * @param securityInfo the security details for the remote hbase service
    * @param fallbackAllowed does the client allow fallback to simple authentication
    * @param rpcProtection the protection level ("authentication", "integrity" or "privacy")
    * @throws IOException
    */
-  protected AbstractHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
+  protected AbstractHBaseSaslRpcClient(Configuration conf,
+      SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
+      InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed,
+      String rpcProtection) throws IOException {
     this.fallbackAllowed = fallbackAllowed;
     saslProps = SaslUtil.initSaslProperties(rpcProtection);
-    switch (method) {
-      case DIGEST:
-        if (LOG.isDebugEnabled()) LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
-            + " client to authenticate to service at " + token.getService());
-        saslClient = createDigestSaslClient(new String[] { AuthMethod.DIGEST.getMechanismName() },
-          SaslUtil.SASL_DEFAULT_REALM, new SaslClientCallbackHandler(token));
-        break;
-      case KERBEROS:
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
-              + " client. Server's Kerberos principal name is " + serverPrincipal);
-        }
-        if (serverPrincipal == null || serverPrincipal.length() == 0) {
-          throw new IOException("Failed to specify server's Kerberos principal name");
-        }
-        String[] names = SaslUtil.splitKerberosName(serverPrincipal);
-        if (names.length != 3) {
-          throw new IOException(
-              "Kerberos principal does not have the expected format: " + serverPrincipal);
-        }
-        saslClient = createKerberosSaslClient(
-          new String[] { AuthMethod.KERBEROS.getMechanismName() }, names[0], names[1]);
-        break;
-      default:
-        throw new IOException("Unknown authentication method " + method);
-    }
+
+    saslClient = provider.createClient(
+        conf, serverAddr, securityInfo, token, fallbackAllowed, saslProps);
     if (saslClient == null) {
-      throw new IOException("Unable to find SASL client implementation");
+      throw new IOException("Authentication provider " + provider.getClass()
+          + " returned a null SaslClient");
     }
   }
 
-  protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
-      CallbackHandler saslClientCallbackHandler) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
-      saslClientCallbackHandler);
-  }
-
-  protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
-      String userSecondPart) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
-      null);
-  }
-
   public byte[] getInitialResponse() throws SaslException {
     if (saslClient.hasInitialResponse()) {
       return saslClient.evaluateChallenge(EMPTY_TOKEN);
@@ -146,53 +108,4 @@ public abstract class AbstractHBaseSaslRpcClient {
   public void dispose() {
     SaslUtil.safeDispose(saslClient);
   }
-
-  @VisibleForTesting
-  static class SaslClientCallbackHandler implements CallbackHandler {
-    private final String userName;
-    private final char[] userPassword;
-
-    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
-      this.userName = SaslUtil.encodeIdentifier(token.getIdentifier());
-      this.userPassword = SaslUtil.encodePassword(token.getPassword());
-    }
-
-    @Override
-    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL client callback: setting username: " + userName);
-        }
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL client callback: setting userPassword");
-        }
-        pc.setPassword(userPassword);
-      }
-      if (rc != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL client callback: setting realm: " + rc.getDefaultText());
-        }
-        rc.setText(rc.getDefaultText());
-      }
-    }
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index 37d3cdd..94e7b7e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -27,16 +27,15 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RemoteException;
@@ -44,6 +43,9 @@ import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslOutputStream;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A utility class that encapsulates SASL logic for RPC client. Copied from
@@ -61,15 +63,17 @@ public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
   private OutputStream cryptoOutputStream;
   private boolean initStreamForCrypto;
 
-  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed) throws IOException {
-    super(method, token, serverPrincipal, fallbackAllowed);
+  public HBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider,
+      Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo,
+      boolean fallbackAllowed) throws IOException {
+    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed);
   }
 
-  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed, String rpcProtection,
-      boolean initStreamForCrypto) throws IOException {
-    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
+  public HBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider,
+      Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo,
+      boolean fallbackAllowed, String rpcProtection, boolean initStreamForCrypto)
+          throws IOException {
+    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
     this.initStreamForCrypto = initStreamForCrypto;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
index 6771e32..a5b9803 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -21,14 +21,17 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import javax.security.sasl.Sasl;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * Implement SASL logic for netty rpc client.
@@ -38,9 +41,10 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
   private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClient.class);
 
-  public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
-    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
+  public NettyHBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider,
+      Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo,
+      boolean fallbackAllowed, String rpcProtection) throws IOException {
+    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
   }
 
   public void setupSaslHandler(ChannelPipeline p) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
index eb4f205..e011cc6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -24,6 +24,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -60,14 +62,14 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
    *          simple.
    */
   public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
-      AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
-      boolean fallbackAllowed, Configuration conf)
-      throws IOException {
+      SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
+      InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed,
+      Configuration conf) throws IOException {
     this.saslPromise = saslPromise;
     this.ugi = ugi;
     this.conf = conf;
-    this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal,
-        fallbackAllowed, conf.get(
+    this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr,
+        securityInfo, fallbackAllowed, conf.get(
         "hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
index 72851b6..ad2067f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
@@ -68,15 +68,15 @@ public class SaslUtil {
     return fullName.split("[/@]");
   }
 
-  static String encodeIdentifier(byte[] identifier) {
+  public static String encodeIdentifier(byte[] identifier) {
     return Base64.getEncoder().encodeToString(identifier);
   }
 
-  static byte[] decodeIdentifier(String identifier) {
+  public static byte[] decodeIdentifier(String identifier) {
     return Base64.getDecoder().decode(Bytes.toBytes(identifier));
   }
 
-  static char[] encodePassword(byte[] password) {
+  public static char[] encodePassword(byte[] password) {
     return Base64.getEncoder().encodeToString(password).toCharArray();
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AbstractSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AbstractSaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..d018ce1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AbstractSaslClientAuthenticationProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Base implementation of {@link SaslClientAuthenticationProvider}. All implementations should
+ * extend this class instead of directly implementing the interface.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public abstract class AbstractSaslClientAuthenticationProvider implements
+        SaslClientAuthenticationProvider {
+  public static final String AUTH_TOKEN_TYPE = "HBASE_AUTH_TOKEN";
+
+
+  @Override
+  public final String getTokenKind() {
+    // All HBase authentication tokens are "HBASE_AUTH_TOKEN"'s. We differentiate between them
+    // via the code().
+    return AUTH_TOKEN_TYPE;
+  }
+
+  /**
+   * Provides a hash code to identify this AuthenticationProvider among others. These two fields
+   * must be unique to ensure that authentication methods are clearly separated.
+   */
+  @Override
+  public final int hashCode() {
+    return getSaslAuthMethod().hashCode();
+  }
+
+  @Override
+  public final boolean equals(Object o) {
+    // SaslClientAuthProviders should be unique via their hashCode().
+    if (o instanceof AbstractSaslClientAuthenticationProvider) {
+      return this.hashCode() == o.hashCode();
+    }
+    return false;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AuthenticationProviderSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AuthenticationProviderSelector.java
new file mode 100644
index 0000000..a681d53
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/AuthenticationProviderSelector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public interface AuthenticationProviderSelector {
+
+  /**
+   * Initializes the implementation with configuration and a set of providers available. This method
+   * should be called exactly once per implementation prior to calling
+   * {@link #selectProvider(String, User)}.
+   */
+  void configure(Configuration conf,
+      Collection<SaslClientAuthenticationProvider> availableProviders);
+
+  /**
+   * Chooses the authentication provider which should be used given the provided client context
+   * from the authentication providers passed in via {@link #configure(Configuration, Collection)}.
+   */
+  Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
+      String clusterId, User user);
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInProviderSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInProviderSelector.java
new file mode 100644
index 0000000..8286380
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInProviderSelector.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Objects;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link AuthenticationProviderSelector} which can choose from the
+ * authentication implementations which HBase provides out of the box: Simple, Kerberos, and
+ * Delegation Token authentication.
+ *
+ * This implementation will ignore any {@link SaslAuthenticationProvider}'s which are available
+ * on the classpath or specified in the configuration because HBase cannot correctly choose which
+ * token should be returned to a client when multiple are present. It is expected that users
+ * implement their own {@link AuthenticationProviderSelector} when writing a custom provider.
+ *
+ * This implementation is not thread-safe. {@link #configure(Configuration, Collection)} and
+ * {@link #selectProvider(String, User)} is not safe if they are called concurrently.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@NotThreadSafe
+public class BuiltInProviderSelector implements AuthenticationProviderSelector {
+  private static final Logger LOG = LoggerFactory.getLogger(BuiltInProviderSelector.class);
+
+  Configuration conf;
+  SimpleSaslClientAuthenticationProvider simpleAuth = null;
+  GssSaslClientAuthenticationProvider krbAuth = null;
+  DigestSaslClientAuthenticationProvider digestAuth = null;
+  Text digestAuthTokenKind = null;
+
+  @Override
+  public void configure(
+      Configuration conf, Collection<SaslClientAuthenticationProvider> providers) {
+    if (this.conf != null) {
+      throw new IllegalStateException("configure() should only be called once");
+    }
+    this.conf = Objects.requireNonNull(conf);
+
+    for (SaslClientAuthenticationProvider provider : Objects.requireNonNull(providers)) {
+      final String name = provider.getSaslAuthMethod().getName();
+      if (SimpleSaslAuthenticationProvider.SASL_AUTH_METHOD.getName().contentEquals(name)) {
+        if (simpleAuth != null) {
+          throw new IllegalStateException(
+              "Encountered multiple SimpleSaslClientAuthenticationProvider instances");
+        }
+        simpleAuth = (SimpleSaslClientAuthenticationProvider) provider;
+      } else if (GssSaslAuthenticationProvider.SASL_AUTH_METHOD.getName().equals(name)) {
+        if (krbAuth != null) {
+          throw new IllegalStateException(
+              "Encountered multiple GssSaslClientAuthenticationProvider instances");
+        }
+        krbAuth = (GssSaslClientAuthenticationProvider) provider;
+      } else if (DigestSaslAuthenticationProvider.SASL_AUTH_METHOD.getName().equals(name)) {
+        if (digestAuth != null) {
+          throw new IllegalStateException(
+              "Encountered multiple DigestSaslClientAuthenticationProvider instances");
+        }
+        digestAuth = (DigestSaslClientAuthenticationProvider) provider;
+        digestAuthTokenKind = new Text(digestAuth.getTokenKind());
+      } else {
+        LOG.warn("Ignoring unknown SaslClientAuthenticationProvider: {}", provider.getClass());
+      }
+    }
+    if (simpleAuth == null || krbAuth == null || digestAuth == null) {
+      throw new IllegalStateException("Failed to load SIMPLE, KERBEROS, and DIGEST authentication "
+          + "providers. Classpath is not sane.");
+    }
+  }
+
+  @Override
+  public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
+      String clusterId, User user) {
+    requireNonNull(clusterId, "Null clusterId was given");
+    requireNonNull(user, "Null user was given");
+
+    // Superfluous: we don't do SIMPLE auth over SASL, but we should to simplify.
+    if (!User.isHBaseSecurityEnabled(conf)) {
+      return new Pair<>(simpleAuth, null);
+    }
+
+    final Text clusterIdAsText = new Text(clusterId);
+
+    // Must be digest auth, look for a token.
+    // TestGenerateDelegationToken is written expecting DT is used when DT and Krb are both present.
+    // (for whatever that's worth).
+    for (Token<? extends TokenIdentifier> token : user.getTokens()) {
+      // We need to check for two things:
+      //   1. This token is for the HBase cluster we want to talk to
+      //   2. We have suppporting client implementation to handle the token (the "kind" of token)
+      if (clusterIdAsText.equals(token.getService()) &&
+          digestAuthTokenKind.equals(token.getKind())) {
+        return new Pair<>(digestAuth, token);
+      }
+    }
+    if (user.getUGI().hasKerberosCredentials()) {
+      return new Pair<>(krbAuth, null);
+    }
+    LOG.debug(
+        "No matching SASL authentication provider and supporting token found from providers.");
+    return null;
+  }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java
new file mode 100644
index 0000000..c1b7ddb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base class for all Apache HBase, built-in {@link SaslAuthenticationProvider}'s to extend.
+ *
+ * HBase users should take care to note that this class (and its sub-classes) are marked with the
+ * {@code InterfaceAudience.Private} annotation. These implementations are available for users to
+ * read, copy, and modify, but should not be extended or re-used in binary form. There are no
+ * compatibility guarantees provided for implementations of this class.
+ */
+@InterfaceAudience.Private
+public abstract class BuiltInSaslAuthenticationProvider implements SaslAuthenticationProvider {
+
+  public static final String AUTH_TOKEN_TYPE = "HBASE_AUTH_TOKEN";
+
+  @Override
+  public String getTokenKind() {
+    return AUTH_TOKEN_TYPE;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java
new file mode 100644
index 0000000..7cbdecd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base client for client/server implementations for the HBase delegation token auth'n method.
+ */
+@InterfaceAudience.Private
+public class DigestSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider {
+
+  public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
+      "DIGEST", (byte)82, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+
+  @Override
+  public SaslAuthMethod getSaslAuthMethod() {
+    return SASL_AUTH_METHOD;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..a84f24b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+@InterfaceAudience.Private
+public class DigestSaslClientAuthenticationProvider extends DigestSaslAuthenticationProvider
+    implements SaslClientAuthenticationProvider {
+
+  @Override
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+      Map<String, String> saslProps) throws IOException {
+    return Sasl.createSaslClient(new String[] { getSaslAuthMethod().getSaslMechanism() }, null,
+        null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new DigestSaslClientCallbackHandler(token));
+  }
+
+  public static class DigestSaslClientCallbackHandler implements CallbackHandler {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(DigestSaslClientCallbackHandler.class);
+    private final String userName;
+    private final char[] userPassword;
+
+    public DigestSaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = SaslUtil.encodeIdentifier(token.getIdentifier());
+      this.userPassword = SaslUtil.encodePassword(token.getPassword());
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        LOG.debug("SASL client callback: setting username: {}", userName);
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        LOG.debug("SASL client callback: setting userPassword");
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        LOG.debug("SASL client callback: setting realm: {}", rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  @Override
+  public UserInformation getUserInfo(User user) {
+    // Don't send user for token auth. Copied from RpcConnection.
+    return null;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java
new file mode 100644
index 0000000..0710184
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base client for client/server implementations for the "KERBEROS" HBase auth'n method.
+ */
+@InterfaceAudience.Private
+public class GssSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider {
+
+  public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
+      "KERBEROS", (byte)81, "GSSAPI", AuthenticationMethod.KERBEROS);
+
+  @Override
+  public SaslAuthMethod getSaslAuthMethod() {
+    return SASL_AUTH_METHOD;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..2db865d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+@InterfaceAudience.Private
+public class GssSaslClientAuthenticationProvider extends GssSaslAuthenticationProvider
+    implements SaslClientAuthenticationProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      GssSaslClientAuthenticationProvider.class);
+
+  String getServerPrincipal(Configuration conf, SecurityInfo securityInfo, InetAddress server)
+      throws IOException {
+    String serverKey = securityInfo.getServerPrincipal();
+    if (serverKey == null) {
+      throw new IllegalArgumentException(
+          "Can't obtain server Kerberos config key from SecurityInfo");
+    }
+    return SecurityUtil.getServerPrincipal(conf.get(serverKey),
+        server.getCanonicalHostName().toLowerCase());
+  }
+
+  @Override
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+      Map<String, String> saslProps) throws IOException {
+    String serverPrincipal = getServerPrincipal(conf, securityInfo, serverAddr);
+    LOG.debug("Setting up Kerberos RPC to server={}", serverPrincipal);
+    String[] names = SaslUtil.splitKerberosName(serverPrincipal);
+    if (names.length != 3) {
+      throw new IOException("Kerberos principal '" + serverPrincipal
+          + "' does not have the expected format");
+    }
+    return Sasl.createSaslClient(new String[] { getSaslAuthMethod().getSaslMechanism() }, null,
+        names[0], names[1], saslProps, null);
+  }
+
+  @Override
+  public UserInformation getUserInfo(User user) {
+    UserInformation.Builder userInfoPB = UserInformation.newBuilder();
+    // Send effective user for Kerberos auth
+    userInfoPB.setEffectiveUser(user.getUGI().getUserName());
+    return userInfoPB.build();
+  }
+
+  @Override
+  public boolean canRetry() {
+    return true;
+  }
+
+  @Override
+  public void relogin() throws IOException {
+    // Check if UGI thinks we need to do another login
+    if (UserGroupInformation.isLoginKeytabBased()) {
+      UserGroupInformation.getLoginUser().reloginFromKeytab();
+    } else {
+      UserGroupInformation.getLoginUser().reloginFromTicketCache();
+    }
+  }
+
+  @Override
+  public UserGroupInformation getRealUser(User user) {
+    final UserGroupInformation ugi = user.getUGI();
+    // Unwrap the UGI with the real user when we're using Kerberos auth
+    if (ugi != null && ugi.getRealUser() != null) {
+      return ugi.getRealUser();
+    }
+
+    // Otherwise, use the UGI we were given
+    return ugi;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthMethod.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthMethod.java
new file mode 100644
index 0000000..7930564
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthMethod.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Describes the way in which some {@link SaslClientAuthenticationProvider} authenticates over SASL.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public class SaslAuthMethod {
+
+  private final String name;
+  private final byte code;
+  private final String saslMech;
+  private final AuthenticationMethod method;
+
+  public SaslAuthMethod(String name, byte code, String saslMech, AuthenticationMethod method) {
+    this.name = name;
+    this.code = code;
+    this.saslMech = saslMech;
+    this.method = method;
+  }
+
+  /**
+   * Returns the unique name to identify this authentication method among other HBase auth methods.
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Returns the unique value to identify this authentication method among other HBase auth methods.
+   */
+  public byte getCode() {
+    return code;
+  }
+
+  /**
+   * Returns the SASL mechanism used by this authentication method.
+   */
+  public String getSaslMechanism() {
+    return saslMech;
+  }
+
+  /**
+   * Returns the Hadoop {@link AuthenticationMethod} for this method.
+   */
+  public AuthenticationMethod getAuthMethod() {
+    return method;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SaslAuthMethod)) {
+      return false;
+    }
+    SaslAuthMethod other = (SaslAuthMethod) o;
+    return Objects.equals(name, other.name) &&
+        code == other.code &&
+        Objects.equals(saslMech, other.saslMech) &&
+        Objects.equals(method, other.method);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder()
+        .append(name)
+        .append(code)
+        .append(saslMech)
+        .append(method)
+        .toHashCode();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthenticationProvider.java
new file mode 100644
index 0000000..1f6d821
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslAuthenticationProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Encapsulation of client-side logic to authenticate to HBase via some means over SASL.
+ * It is suggested that custom implementations extend the abstract class in the type hierarchy
+ * instead of directly implementing this interface (clients have a base class available, but
+ * servers presently do not).
+ *
+ * Implementations of this interface <b>must</b> be unique among each other via the {@code byte}
+ * returned by {@link SaslAuthMethod#getCode()} on {@link #getSaslAuthMethod()}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public interface SaslAuthenticationProvider {
+
+  /**
+   * Returns the attributes which identify how this provider authenticates.
+   */
+  SaslAuthMethod getSaslAuthMethod();
+
+  /**
+   * Returns the name of the type used by the TokenIdentifier.
+   */
+  String getTokenKind();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..4b1cabc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+/**
+ * Encapsulation of client-side logic to authenticate to HBase via some means over SASL.
+ * Implementations should not directly implement this interface, but instead extend
+ * {@link AbstractSaslClientAuthenticationProvider}.
+ *
+ * Implementations of this interface must make an implementation of {@code hashCode()}
+ * which returns the same value across multiple instances of the provider implementation.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public interface SaslClientAuthenticationProvider extends SaslAuthenticationProvider {
+
+  /**
+   * Creates the SASL client instance for this auth'n method.
+   */
+  SaslClient createClient(Configuration conf, InetAddress serverAddr, SecurityInfo securityInfo,
+      Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+      Map<String, String> saslProps) throws IOException;
+
+  /**
+   * Constructs a {@link UserInformation} from the given {@link UserGroupInformation}
+   */
+  UserInformation getUserInfo(User user);
+
+  /**
+   * Returns the "real" user, the user who has the credentials being authenticated by the
+   * remote service, in the form of an {@link UserGroupInformation} object.
+   *
+   * It is common in the Hadoop "world" to have distinct notions of a "real" user and a "proxy"
+   * user. A "real" user is the user which actually has the credentials (often, a Kerberos ticket),
+   * but some code may be running as some other user who has no credentials. This method gives
+   * the authentication provider a chance to acknowledge this is happening and ensure that any
+   * RPCs are executed with the real user's credentials, because executing them as the proxy user
+   * would result in failure because no credentials exist to authenticate the RPC.
+   *
+   * Not all implementations will need to implement this method. By default, the provided User's
+   * UGI is returned directly.
+   */
+  default UserGroupInformation getRealUser(User ugi) {
+    return ugi.getUGI();
+  }
+
+  /**
+   * Returns true if the implementation is capable of performing some action which may allow a
+   * failed authentication to become a successful authentication. Otherwise, returns false
+   */
+  default boolean canRetry() {
+    return false;
+  }
+
+  /**
+   * Executes any necessary logic to re-login the client. Not all implementations will have
+   * any logic that needs to be executed.
+   */
+  default void relogin() throws IOException {}
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProviders.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProviders.java
new file mode 100644
index 0000000..56c36e8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProviders.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Accessor for all SaslAuthenticationProvider instances.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public final class SaslClientAuthenticationProviders {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SaslClientAuthenticationProviders.class);
+
+  public static final String SELECTOR_KEY = "hbase.client.sasl.provider.class";
+  public static final String EXTRA_PROVIDERS_KEY = "hbase.client.sasl.provider.extras";
+
+  private static final AtomicReference<SaslClientAuthenticationProviders> providersRef =
+      new AtomicReference<>();
+
+  private final Collection<SaslClientAuthenticationProvider> providers;
+  private final AuthenticationProviderSelector selector;
+
+  private SaslClientAuthenticationProviders(
+      Collection<SaslClientAuthenticationProvider> providers,
+      AuthenticationProviderSelector selector) {
+    this.providers = providers;
+    this.selector = selector;
+  }
+
+  /**
+   * Returns the number of providers that have been registered.
+   */
+  public int getNumRegisteredProviders() {
+    return providers.size();
+  }
+
+  /**
+   * Returns a singleton instance of {@link SaslClientAuthenticationProviders}.
+   */
+  public static synchronized SaslClientAuthenticationProviders getInstance(Configuration conf) {
+    SaslClientAuthenticationProviders providers = providersRef.get();
+    if (providers == null) {
+      providers = instantiate(conf);
+      providersRef.set(providers);
+    }
+
+    return providers;
+  }
+
+  /**
+   * Removes the cached singleton instance of {@link SaslClientAuthenticationProviders}.
+   */
+  public static synchronized void reset() {
+    providersRef.set(null);
+  }
+
+  /**
+   * Adds the given {@code provider} to the set, only if an equivalent provider does not
+   * already exist in the set.
+   */
+  static void addProviderIfNotExists(SaslClientAuthenticationProvider provider,
+      HashMap<Byte,SaslClientAuthenticationProvider> providers) {
+    Byte code = provider.getSaslAuthMethod().getCode();
+    SaslClientAuthenticationProvider existingProvider = providers.get(code);
+    if (existingProvider != null) {
+      throw new RuntimeException("Already registered authentication provider with " + code + " "
+          + existingProvider.getClass());
+    }
+    providers.put(code, provider);
+  }
+
+  /**
+   * Instantiates the ProviderSelector implementation from the provided configuration.
+   */
+  static AuthenticationProviderSelector instantiateSelector(Configuration conf,
+      Collection<SaslClientAuthenticationProvider> providers) {
+    Class<? extends AuthenticationProviderSelector> clz = conf.getClass(
+        SELECTOR_KEY, BuiltInProviderSelector.class, AuthenticationProviderSelector.class);
+    try {
+      AuthenticationProviderSelector selector = clz.newInstance();
+      selector.configure(conf, providers);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Loaded ProviderSelector {}", selector.getClass());
+      }
+      return selector;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException("Failed to instantiate " + clz +
+          " as the ProviderSelector defined by " + SELECTOR_KEY, e);
+    }
+  }
+
+  /**
+   * Extracts and instantiates authentication providers from the configuration.
+   */
+  static void addExplicitProviders(Configuration conf,
+      HashMap<Byte,SaslClientAuthenticationProvider> providers) {
+    for(String implName : conf.getStringCollection(EXTRA_PROVIDERS_KEY)) {
+      Class<?> clz;
+      // Load the class from the config
+      try {
+        clz = Class.forName(implName);
+      } catch (ClassNotFoundException e) {
+        LOG.warn("Failed to load SaslClientAuthenticationProvider {}", implName, e);
+        continue;
+      }
+
+      // Make sure it's the right type
+      if (!SaslClientAuthenticationProvider.class.isAssignableFrom(clz)) {
+        LOG.warn("Ignoring SaslClientAuthenticationProvider {} because it is not an instance of"
+            + " SaslClientAuthenticationProvider", clz);
+        continue;
+      }
+
+      // Instantiate it
+      SaslClientAuthenticationProvider provider;
+      try {
+        provider = (SaslClientAuthenticationProvider) clz.newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        LOG.warn("Failed to instantiate SaslClientAuthenticationProvider {}", clz, e);
+        continue;
+      }
+
+      // Add it to our set, only if it doesn't conflict with something else we've
+      // already registered.
+      addProviderIfNotExists(provider, providers);
+    }
+  }
+
+  /**
+   * Instantiates all client authentication providers and returns an instance of
+   * {@link SaslClientAuthenticationProviders}.
+   */
+  static SaslClientAuthenticationProviders instantiate(Configuration conf) {
+    ServiceLoader<SaslClientAuthenticationProvider> loader =
+        ServiceLoader.load(SaslClientAuthenticationProvider.class);
+    HashMap<Byte,SaslClientAuthenticationProvider> providerMap = new HashMap<>();
+    for (SaslClientAuthenticationProvider provider : loader) {
+      addProviderIfNotExists(provider, providerMap);
+    }
+
+    addExplicitProviders(conf, providerMap);
+
+    Collection<SaslClientAuthenticationProvider> providers = Collections.unmodifiableCollection(
+        providerMap.values());
+
+    if (LOG.isTraceEnabled()) {
+      String loadedProviders = providers.stream()
+          .map((provider) -> provider.getClass().getName())
+          .collect(Collectors.joining(", "));
+      LOG.trace("Found SaslClientAuthenticationProviders {}", loadedProviders);
+    }
+
+    AuthenticationProviderSelector selector = instantiateSelector(conf, providers);
+    return new SaslClientAuthenticationProviders(providers, selector);
+  }
+
+  /**
+   * Returns the provider and token pair for SIMPLE authentication.
+   *
+   * This method is a "hack" while SIMPLE authentication for HBase does not flow through
+   * the SASL codepath.
+   */
+  public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
+      getSimpleProvider() {
+    Optional<SaslClientAuthenticationProvider> optional = providers.stream()
+        .filter((p) -> p instanceof SimpleSaslClientAuthenticationProvider)
+        .findFirst();
+    return new Pair<>(optional.get(), null);
+  }
+
+  /**
+   * Chooses the best authentication provider and corresponding token given the HBase cluster
+   * identifier and the user.
+   */
+  public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
+      String clusterId, User clientUser) {
+    return selector.selectProvider(clusterId, clientUser);
+  }
+
+  @Override
+  public String toString() {
+    return providers.stream()
+        .map((p) -> p.getClass().getName())
+        .collect(Collectors.joining(", ", "providers=[", "], selector=")) + selector.getClass();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java
new file mode 100644
index 0000000..3f1122c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base client for client/server implementations for the "SIMPLE" HBase auth'n method.
+ */
+@InterfaceAudience.Private
+public class SimpleSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider {
+  public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
+      "SIMPLE", (byte)80, "", AuthenticationMethod.SIMPLE);
+
+  @Override
+  public SaslAuthMethod getSaslAuthMethod() {
+    return SASL_AUTH_METHOD;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..3a9142f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+@InterfaceAudience.Private
+public class SimpleSaslClientAuthenticationProvider extends
+    SimpleSaslAuthenticationProvider implements SaslClientAuthenticationProvider {
+
+  @Override
+  public SaslClient createClient(Configuration conf, InetAddress serverAddress,
+      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+      Map<String, String> saslProps) throws IOException {
+    return null;
+  }
+
+  @Override
+  public UserInformation getUserInfo(User user) {
+    final UserGroupInformation ugi = user.getUGI();
+    UserInformation.Builder userInfoPB = UserInformation.newBuilder();
+    // Send both effective user and real user for simple auth
+    userInfoPB.setEffectiveUser(ugi.getUserName());
+    if (ugi.getRealUser() != null) {
+      userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+    }
+    return userInfoPB.build();
+  }
+}
diff --git a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider
new file mode 100644
index 0000000..713f469
--- /dev/null
+++ b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.hbase.security.provider.GssSaslClientAuthenticationProvider
+org.apache.hadoop.hbase.security.provider.DigestSaslClientAuthenticationProvider
+org.apache.hadoop.hbase.security.provider.SimpleSaslClientAuthenticationProvider
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
index 5e97ada..9fc510c 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -21,25 +21,32 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
 import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.TextOutputCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.security.AbstractHBaseSaslRpcClient.SaslClientCallbackHandler;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.provider.DigestSaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.security.provider.DigestSaslClientAuthenticationProvider.DigestSaslClientCallbackHandler;
+import org.apache.hadoop.hbase.security.provider.GssSaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.security.provider.SimpleSaslClientAuthenticationProvider;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -89,9 +96,11 @@ public class TestHBaseSaslRpcClient {
   public void testSaslClientUsesGivenRpcProtection() throws Exception {
     Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
         DEFAULT_USER_PASSWORD);
+    DigestSaslClientAuthenticationProvider provider = new DigestSaslClientAuthenticationProvider();
     for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
-      String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
-          "principal/host@DOMAIN.COM", false, qop.name(), false) {
+      String negotiatedQop = new HBaseSaslRpcClient(HBaseConfiguration.create(), provider, token,
+          Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false, qop.name(),
+          false) {
         public String getQop() {
           return saslProps.get(Sasl.QOP);
         }
@@ -101,7 +110,7 @@ public class TestHBaseSaslRpcClient {
   }
 
   @Test
-  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
+  public void testDigestSaslClientCallbackHandler() throws UnsupportedCallbackException {
     final Token<? extends TokenIdentifier> token = createTokenMock();
     when(token.getIdentifier()).thenReturn(Bytes.toBytes(DEFAULT_USER_NAME));
     when(token.getPassword()).thenReturn(Bytes.toBytes(DEFAULT_USER_PASSWORD));
@@ -109,28 +118,29 @@ public class TestHBaseSaslRpcClient {
     final NameCallback nameCallback = mock(NameCallback.class);
     final PasswordCallback passwordCallback = mock(PasswordCallback.class);
     final RealmCallback realmCallback = mock(RealmCallback.class);
-    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
 
-    Callback[] callbackArray = {nameCallback, passwordCallback, realmCallback, realmChoiceCallback};
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    // We can provide a realmCallback, but HBase presently does nothing with it.
+    Callback[] callbackArray = {nameCallback, passwordCallback, realmCallback};
+    final DigestSaslClientCallbackHandler saslClCallbackHandler =
+        new DigestSaslClientCallbackHandler(token);
     saslClCallbackHandler.handle(callbackArray);
     verify(nameCallback).setName(anyString());
-    verify(realmCallback).setText(any());
     verify(passwordCallback).setPassword(any());
   }
 
   @Test
-  public void testSaslClientCallbackHandlerWithException() {
+  public void testDigestSaslClientCallbackHandlerWithException() {
     final Token<? extends TokenIdentifier> token = createTokenMock();
     when(token.getIdentifier()).thenReturn(Bytes.toBytes(DEFAULT_USER_NAME));
     when(token.getPassword()).thenReturn(Bytes.toBytes(DEFAULT_USER_PASSWORD));
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    final DigestSaslClientCallbackHandler saslClCallbackHandler =
+        new DigestSaslClientCallbackHandler(token);
     try {
       saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
     } catch (UnsupportedCallbackException expEx) {
       //expected
     } catch (Exception ex) {
-      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
+      fail("testDigestSaslClientCallbackHandlerWithException error : " + ex.getMessage());
     }
   }
 
@@ -196,21 +206,17 @@ public class TestHBaseSaslRpcClient {
     boolean inState = false;
     boolean outState = false;
 
-    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
-        createTokenMockWithCredentials(principal, password), principal, false) {
-      @Override
-      public SaslClient createDigestSaslClient(String[] mechanismNames,
-          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-              throws IOException {
-        return Mockito.mock(SaslClient.class);
-      }
-
+    DigestSaslClientAuthenticationProvider provider = new DigestSaslClientAuthenticationProvider() {
       @Override
-      public SaslClient createKerberosSaslClient(String[] mechanismNames,
-          String userFirstPart, String userSecondPart) throws IOException {
+      public SaslClient createClient(Configuration conf, InetAddress serverAddress,
+          SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
+          boolean fallbackAllowed, Map<String, String> saslProps) {
         return Mockito.mock(SaslClient.class);
       }
     };
+    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(), provider,
+        createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class),
+        Mockito.mock(SecurityInfo.class), false);
 
     try {
       rpcClient.getInputStream();
@@ -231,21 +237,19 @@ public class TestHBaseSaslRpcClient {
 
   private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
     try {
-      new HBaseSaslRpcClient(AuthMethod.DIGEST,
-          createTokenMockWithCredentials(principal, password), principal, false) {
-        @Override
-        public SaslClient createDigestSaslClient(String[] mechanismNames,
-            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-                throws IOException {
-          return null;
-        }
-
+      DigestSaslClientAuthenticationProvider provider =
+          new DigestSaslClientAuthenticationProvider() {
         @Override
-        public SaslClient createKerberosSaslClient(String[] mechanismNames,
-            String userFirstPart, String userSecondPart) throws IOException {
+        public SaslClient createClient(Configuration conf, InetAddress serverAddress,
+            SecurityInfo securityInfo,
+            Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+            Map<String, String> saslProps) {
           return null;
         }
       };
+      new HBaseSaslRpcClient(HBaseConfiguration.create(), provider,
+          createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class),
+          Mockito.mock(SecurityInfo.class), false);
       return false;
     } catch (IOException ex) {
       return true;
@@ -265,8 +269,10 @@ public class TestHBaseSaslRpcClient {
   private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
     HBaseSaslRpcClient rpcClient = null;
     try {
-      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
-          createTokenMockWithCredentials(principal, password), principal, false);
+      rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(),
+          new DigestSaslClientAuthenticationProvider(),
+          createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class),
+          Mockito.mock(SecurityInfo.class), false);
     } catch(Exception ex) {
       LOG.error(ex.getMessage(), ex);
     }
@@ -285,7 +291,9 @@ public class TestHBaseSaslRpcClient {
 
   private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
       throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
+    return new HBaseSaslRpcClient(HBaseConfiguration.create(),
+        new GssSaslClientAuthenticationProvider(), createTokenMock(),
+        Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false);
   }
 
   private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
@@ -301,7 +309,9 @@ public class TestHBaseSaslRpcClient {
 
   private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
       throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
+    return new HBaseSaslRpcClient(HBaseConfiguration.create(),
+        new SimpleSaslClientAuthenticationProvider(), createTokenMock(),
+        Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false);
   }
 
   @SuppressWarnings("unchecked")
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestDefaultProviderSelector.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestDefaultProviderSelector.java
new file mode 100644
index 0000000..eff3b5f
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestDefaultProviderSelector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestDefaultProviderSelector {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestDefaultProviderSelector.class);
+
+  BuiltInProviderSelector selector;
+  @Before
+  public void setup() {
+    selector = new BuiltInProviderSelector();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testExceptionOnMissingProviders() {
+    selector.configure(new Configuration(false), Collections.emptySet());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullConfiguration() {
+    selector.configure(null, Collections.emptySet());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullProviderMap() {
+    selector.configure(new Configuration(false), null);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDuplicateProviders() {
+    Set<SaslClientAuthenticationProvider> providers = new HashSet<>();
+    providers.add(new SimpleSaslClientAuthenticationProvider());
+    providers.add(new SimpleSaslClientAuthenticationProvider());
+    selector.configure(new Configuration(false), providers);
+  }
+
+  @Test
+  public void testExpectedProviders() {
+    HashSet<SaslClientAuthenticationProvider> providers = new HashSet<>(Arrays.asList(
+        new SimpleSaslClientAuthenticationProvider(), new GssSaslClientAuthenticationProvider(),
+        new DigestSaslClientAuthenticationProvider()));
+
+    selector.configure(new Configuration(false), providers);
+
+    assertNotNull("Simple provider was null", selector.simpleAuth);
+    assertNotNull("Kerberos provider was null", selector.krbAuth);
+    assertNotNull("Digest provider was null", selector.digestAuth);
+  }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslClientAuthenticationProviders.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslClientAuthenticationProviders.java
new file mode 100644
index 0000000..c131702
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslClientAuthenticationProviders.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+@Category({SmallTests.class, SecurityTests.class})
+public class TestSaslClientAuthenticationProviders {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSaslClientAuthenticationProviders.class);
+
+  @Test
+  public void testCannotAddTheSameProviderTwice() {
+    HashMap<Byte,SaslClientAuthenticationProvider> registeredProviders = new HashMap<>();
+    SaslClientAuthenticationProvider p1 = new SimpleSaslClientAuthenticationProvider();
+    SaslClientAuthenticationProvider p2 = new SimpleSaslClientAuthenticationProvider();
+
+    SaslClientAuthenticationProviders.addProviderIfNotExists(p1, registeredProviders);
+    assertEquals(1, registeredProviders.size());
+
+    try {
+      SaslClientAuthenticationProviders.addProviderIfNotExists(p2, registeredProviders);
+    } catch (RuntimeException e) {}
+
+    assertSame("Expected the original provider to be present", p1,
+        registeredProviders.entrySet().iterator().next().getValue());
+  }
+
+  @Test
+  public void testInstanceIsCached() {
+    Configuration conf = HBaseConfiguration.create();
+    SaslClientAuthenticationProviders providers1 =
+        SaslClientAuthenticationProviders.getInstance(conf);
+    SaslClientAuthenticationProviders providers2 =
+        SaslClientAuthenticationProviders.getInstance(conf);
+    assertSame(providers1, providers2);
+
+    SaslClientAuthenticationProviders.reset();
+
+    SaslClientAuthenticationProviders providers3 =
+        SaslClientAuthenticationProviders.getInstance(conf);
+    assertNotSame(providers1, providers3);
+    assertEquals(providers1.getNumRegisteredProviders(), providers3.getNumRegisteredProviders());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testDifferentConflictingImplementationsFail() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        ConflictingProvider1.class.getName(), ConflictingProvider2.class.getName());
+    SaslClientAuthenticationProviders.getInstance(conf);
+  }
+
+  static class ConflictingProvider1 implements SaslClientAuthenticationProvider {
+    static final SaslAuthMethod METHOD1 = new SaslAuthMethod(
+        "FOO", (byte)12, "DIGEST-MD5", AuthenticationMethod.SIMPLE);
+    @Override public SaslAuthMethod getSaslAuthMethod() {
+      return METHOD1;
+    }
+
+    @Override public String getTokenKind() {
+      return null;
+    }
+
+    @Override public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+        SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+        Map<String, String> saslProps) throws IOException {
+      return null;
+    }
+
+    @Override public UserInformation getUserInfo(User user) {
+      return null;
+    }
+  }
+
+  static class ConflictingProvider2 implements SaslClientAuthenticationProvider {
+    static final SaslAuthMethod METHOD2 = new SaslAuthMethod(
+        "BAR", (byte)12, "DIGEST-MD5", AuthenticationMethod.SIMPLE);
+    @Override public SaslAuthMethod getSaslAuthMethod() {
+      return METHOD2;
+    }
+
+    @Override public String getTokenKind() {
+      return null;
+    }
+
+    @Override public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+        SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+        Map<String, String> saslProps) throws IOException {
+      return null;
+    }
+
+    @Override public UserInformation getUserInfo(User user) {
+      return null;
+    }
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
index d5d4643..12b8398 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
@@ -53,4 +53,9 @@ public final class HBaseInterfaceAudience {
    * Denotes classes used by hbck tool for fixing inconsistent state of HBase.
    */
   public static final String HBCK = "HBCK";
+
+  /**
+   * Denotes classes that can be used to build custom authentication solutions.
+   */
+  public static final String AUTHENTICATION = "Authentication";
 }
diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml
index 7a77579..84897ca 100644
--- a/hbase-examples/pom.xml
+++ b/hbase-examples/pom.xml
@@ -174,6 +174,28 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>bouncycastle</groupId>
+          <artifactId>bcprov-jdk15</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-http</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
   </dependencies>
   <profiles>
     <!-- Skip the tests in this module -->
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/SaslPlainServer.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/SaslPlainServer.java
new file mode 100644
index 0000000..9c2ce9b
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/SaslPlainServer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.provider.example;
+
+import java.nio.charset.StandardCharsets;
+import java.security.Provider;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This class was copied from Hadoop Common (3.1.2) and subsequently modified.
+ */
+@InterfaceAudience.Private
+public class SaslPlainServer implements SaslServer {
+  @SuppressWarnings("serial")
+  public static class SecurityProvider extends Provider {
+    public SecurityProvider() {
+      super("SaslPlainServer", 1.0, "SASL PLAIN Authentication Server");
+      put("SaslServerFactory.PLAIN",
+          SaslPlainServerFactory.class.getName());
+    }
+  }
+
+  public static class SaslPlainServerFactory implements SaslServerFactory {
+    @Override
+    public SaslServer createSaslServer(String mechanism, String protocol,
+        String serverName, Map<String,?> props, CallbackHandler cbh)
+            throws SaslException {
+      return "PLAIN".equals(mechanism) ? new SaslPlainServer(cbh) : null;
+    }
+    @Override
+    public String[] getMechanismNames(Map<String,?> props){
+      return (props == null) || "false".equals(props.get(Sasl.POLICY_NOPLAINTEXT))
+          ? new String[]{"PLAIN"}
+          : new String[0];
+    }
+  }
+
+  private CallbackHandler cbh;
+  private boolean completed;
+  private String authz;
+
+  SaslPlainServer(CallbackHandler callback) {
+    this.cbh = callback;
+  }
+
+  @Override
+  public String getMechanismName() {
+    return "PLAIN";
+  }
+
+  @Override
+  public byte[] evaluateResponse(byte[] response) throws SaslException {
+    if (completed) {
+      throw new IllegalStateException("PLAIN authentication has completed");
+    }
+    if (response == null) {
+      throw new IllegalArgumentException("Received null response");
+    }
+    try {
+      String payload;
+      try {
+        payload = new String(response, StandardCharsets.UTF_8);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Received corrupt response", e);
+      }
+      // [ authz, authn, password ]
+      String[] parts = payload.split("\u0000", 3);
+      if (parts.length != 3) {
+        throw new IllegalArgumentException("Received corrupt response");
+      }
+      if (parts[0].isEmpty()) { // authz = authn
+        parts[0] = parts[1];
+      }
+
+      NameCallback nc = new NameCallback("SASL PLAIN");
+      nc.setName(parts[1]);
+      PasswordCallback pc = new PasswordCallback("SASL PLAIN", false);
+      pc.setPassword(parts[2].toCharArray());
+      AuthorizeCallback ac = new AuthorizeCallback(parts[1], parts[0]);
+      cbh.handle(new Callback[]{nc, pc, ac});
+      if (ac.isAuthorized()) {
+        authz = ac.getAuthorizedID();
+      }
+    } catch (Exception e) {
+      throw new SaslException("PLAIN auth failed: " + e.toString(), e);
+    } finally {
+      completed = true;
+    }
+    return null;
+  }
+
+  private void throwIfNotComplete() {
+    if (!completed) {
+      throw new IllegalStateException("PLAIN authentication not completed");
+    }
+  }
+
+  @Override
+  public boolean isComplete() {
+    return completed;
+  }
+
+  @Override
+  public String getAuthorizationID() {
+    throwIfNotComplete();
+    return authz;
+  }
+
+  @Override
+  public Object getNegotiatedProperty(String propName) {
+    throwIfNotComplete();
+    return Sasl.QOP.equals(propName) ? "auth" : null;
+  }
+
+  @Override
+  public byte[] wrap(byte[] outgoing, int offset, int len)
+      throws SaslException {
+    throwIfNotComplete();
+    throw new IllegalStateException(
+        "PLAIN supports neither integrity nor privacy");
+  }
+
+  @Override
+  public byte[] unwrap(byte[] incoming, int offset, int len)
+      throws SaslException {
+    throwIfNotComplete();
+    throw new IllegalStateException(
+        "PLAIN supports neither integrity nor privacy");
+  }
+
+  @Override
+  public void dispose() throws SaslException {
+    cbh = null;
+    authz = null;
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeClientTokenUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeClientTokenUtil.java
new file mode 100644
index 0000000..1f3238a
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeClientTokenUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used to acquire tokens for the ShadeSaslAuthenticationProvider.
+ */
+@InterfaceAudience.Private
+public final class ShadeClientTokenUtil {
+
+  private ShadeClientTokenUtil() {}
+
+  public static Token<? extends TokenIdentifier> obtainToken(
+      Connection conn, String username, char[] password) throws IOException {
+    ShadeTokenIdentifier identifier = new ShadeTokenIdentifier(username);
+    return new Token<>(identifier.getBytes(), Bytes.toBytes(new String(password)),
+        new Text(ShadeSaslAuthenticationProvider.TOKEN_KIND),
+        new Text(conn.getAdmin().getClusterMetrics().getClusterId()));
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeProviderSelector.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeProviderSelector.java
new file mode 100644
index 0000000..6e369f9
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeProviderSelector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import java.util.Collection;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class ShadeProviderSelector extends BuiltInProviderSelector {
+
+  private final Text SHADE_TOKEN_KIND_TEXT = new Text(ShadeSaslAuthenticationProvider.TOKEN_KIND);
+  private ShadeSaslClientAuthenticationProvider shade;
+
+  @Override
+  public void configure(
+      Configuration conf, Collection<SaslClientAuthenticationProvider> providers) {
+    super.configure(conf, providers);
+
+    this.shade = (ShadeSaslClientAuthenticationProvider) providers.stream()
+        .filter((p) -> p instanceof ShadeSaslClientAuthenticationProvider)
+        .findFirst()
+        .orElseThrow(() -> new RuntimeException(
+            "ShadeSaslClientAuthenticationProvider not loaded"));
+  }
+
+  @Override
+  public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
+      String clusterId, User user) {
+    Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
+        super.selectProvider(clusterId, user);
+
+    Optional<Token<?>> optional = user.getTokens().stream()
+        .filter((t) -> SHADE_TOKEN_KIND_TEXT.equals(t.getKind()))
+        .findFirst();
+    if (optional.isPresent()) {
+      return new Pair<>(shade, optional.get());
+    }
+
+    return pair;
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslAuthenticationProvider.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslAuthenticationProvider.java
new file mode 100644
index 0000000..80f7ace
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslAuthenticationProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
+import org.apache.hadoop.hbase.security.provider.SaslAuthenticationProvider;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public abstract class ShadeSaslAuthenticationProvider implements SaslAuthenticationProvider {
+  public static final SaslAuthMethod METHOD = new SaslAuthMethod(
+      "SHADE", (byte) 15, "PLAIN", AuthenticationMethod.TOKEN);
+  public static final String TOKEN_KIND = "HBASE_EXAMPLE_SHADE_TOKEN";
+
+  @Override public SaslAuthMethod getSaslAuthMethod() {
+    return METHOD;
+  }
+
+  @Override public String getTokenKind() {
+    return TOKEN_KIND;
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
new file mode 100644
index 0000000..7cda97b
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+@InterfaceAudience.Private
+public class ShadeSaslClientAuthenticationProvider extends ShadeSaslAuthenticationProvider
+    implements SaslClientAuthenticationProvider {
+
+  @Override
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+      Map<String, String> saslProps) throws IOException {
+    return Sasl.createSaslClient(new String[] { getSaslAuthMethod().getSaslMechanism()}, null, null,
+        SaslUtil.SASL_DEFAULT_REALM, saslProps, new ShadeSaslClientCallbackHandler(token));
+  }
+
+  @Override
+  public UserInformation getUserInfo(User user) {
+    UserInformation.Builder userInfoPB = UserInformation.newBuilder();
+    userInfoPB.setEffectiveUser(user.getUGI().getUserName());
+    return userInfoPB.build();
+  }
+
+  static class ShadeSaslClientCallbackHandler implements CallbackHandler {
+    private final String username;
+    private final char[] password;
+    public ShadeSaslClientCallbackHandler(
+        Token<? extends TokenIdentifier> token) throws IOException {
+      TokenIdentifier id = token.decodeIdentifier();
+      if (id == null) {
+        // Something is wrong with the environment if we can't get our Identifier back out.
+        throw new IllegalStateException("Could not extract Identifier from Token");
+      }
+      this.username = id.getUser().getUserName();
+      this.password = Bytes.toString(token.getPassword()).toCharArray();
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(username);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslServerAuthenticationProvider.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslServerAuthenticationProvider.java
new file mode 100644
index 0000000..dc8d89b
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslServerAuthenticationProvider.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.security.provider.AttemptingUserProvidingSaslServer;
+import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ShadeSaslServerAuthenticationProvider extends ShadeSaslAuthenticationProvider
+    implements SaslServerAuthenticationProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ShadeSaslServerAuthenticationProvider.class);
+
+  public static final String PASSWORD_FILE_KEY = "hbase.security.shade.password.file";
+  static final char SEPARATOR = '=';
+
+  private AtomicReference<UserGroupInformation> attemptingUser = new AtomicReference<>(null);
+  private Map<String,char[]> passwordDatabase;
+
+  @Override
+  public void init(Configuration conf) throws IOException {
+    passwordDatabase = readPasswordDB(conf);
+  }
+
+  @Override
+  public AttemptingUserProvidingSaslServer createServer(
+      SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps)
+          throws IOException {
+    return new AttemptingUserProvidingSaslServer(
+        new SaslPlainServer(
+            new ShadeSaslServerCallbackHandler(attemptingUser, passwordDatabase)),
+      () -> attemptingUser.get());
+  }
+
+  Map<String,char[]> readPasswordDB(Configuration conf) throws IOException {
+    String passwordFileName = conf.get(PASSWORD_FILE_KEY);
+    if (passwordFileName == null) {
+      throw new RuntimeException(PASSWORD_FILE_KEY
+          + " is not defined in configuration, cannot use this implementation");
+    }
+
+    Path passwordFile = new Path(passwordFileName);
+    FileSystem fs = passwordFile.getFileSystem(conf);
+    if (!fs.exists(passwordFile)) {
+      throw new RuntimeException("Configured password file does not exist: " + passwordFile);
+    }
+
+    Map<String,char[]> passwordDb = new HashMap<>();
+    try (FSDataInputStream fdis = fs.open(passwordFile);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(fdis))) {
+      String line = null;
+      int offset = 0;
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        String[] parts = StringUtils.split(line, SEPARATOR);
+        if (parts.length < 2) {
+          LOG.warn("Password file contains invalid record on line {}, skipping", offset + 1);
+          continue;
+        }
+
+        final String username = parts[0];
+        StringBuilder builder = new StringBuilder();
+        for (int i = 1; i < parts.length; i++) {
+          if (builder.length() > 0) {
+            builder.append(SEPARATOR);
+          }
+          builder.append(parts[i]);
+        }
+
+        passwordDb.put(username, builder.toString().toCharArray());
+        offset++;
+      }
+    }
+
+    return passwordDb;
+  }
+
+  @Override
+  public boolean supportsProtocolAuthentication() {
+    return false;
+  }
+
+  @Override
+  public UserGroupInformation getAuthorizedUgi(String authzId,
+      SecretManager<TokenIdentifier> secretManager) throws IOException {
+    return UserGroupInformation.createRemoteUser(authzId);
+  }
+
+  static class ShadeSaslServerCallbackHandler implements CallbackHandler {
+    private final AtomicReference<UserGroupInformation> attemptingUser;
+    private final Map<String,char[]> passwordDatabase;
+
+    public ShadeSaslServerCallbackHandler(AtomicReference<UserGroupInformation> attemptingUser,
+        Map<String,char[]> passwordDatabase) {
+      this.attemptingUser = attemptingUser;
+      this.passwordDatabase = passwordDatabase;
+    }
+
+    @Override public void handle(Callback[] callbacks)
+        throws InvalidToken, UnsupportedCallbackException {
+      LOG.info("SaslServerCallbackHandler called", new Exception());
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL PLAIN Callback");
+        }
+      }
+
+      if (nc != null && pc != null) {
+        String username = nc.getName();
+
+        UserGroupInformation ugi = createUgiForRemoteUser(username);
+        attemptingUser.set(ugi);
+
+        char[] clientPassword = pc.getPassword();
+        char[] actualPassword = passwordDatabase.get(username);
+        if (!Arrays.equals(clientPassword, actualPassword)) {
+          throw new InvalidToken("Authentication failed for " + username);
+        }
+      }
+
+      if (ac != null) {
+        String authenticatedUserId = ac.getAuthenticationID();
+        String userRequestedToExecuteAs = ac.getAuthorizationID();
+        if (authenticatedUserId.equals(userRequestedToExecuteAs)) {
+          ac.setAuthorized(true);
+          ac.setAuthorizedID(userRequestedToExecuteAs);
+        } else {
+          ac.setAuthorized(false);
+        }
+      }
+    }
+
+    UserGroupInformation createUgiForRemoteUser(String username) {
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
+      ugi.setAuthenticationMethod(ShadeSaslAuthenticationProvider.METHOD.getAuthMethod());
+      return ugi;
+    }
+  }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeTokenIdentifier.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeTokenIdentifier.java
new file mode 100644
index 0000000..2255323
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeTokenIdentifier.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class ShadeTokenIdentifier extends TokenIdentifier {
+  private static final Text TEXT_TOKEN_KIND = new Text(ShadeSaslAuthenticationProvider.TOKEN_KIND);
+  private String username;
+
+  public ShadeTokenIdentifier() {
+    // for ServiceLoader
+  }
+
+  public ShadeTokenIdentifier(String username) {
+    this.username = requireNonNull(username);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(username);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    username = in.readUTF();
+  }
+
+  @Override
+  public Text getKind() {
+    return TEXT_TOKEN_KIND;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (username == null || "".equals(username)) {
+      return null;
+    }
+    return UserGroupInformation.createRemoteUser(username);
+  }
+}
diff --git a/hbase-examples/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hbase-examples/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
new file mode 100644
index 0000000..c9660d7
--- /dev/null
+++ b/hbase-examples/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hadoop.hbase.security.provider.example.ShadeTokenIdentifier
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/security/provider/example/TestShadeSaslAuthenticationProvider.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/security/provider/example/TestShadeSaslAuthenticationProvider.java
new file mode 100644
index 0000000..001842f
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/security/provider/example/TestShadeSaslAuthenticationProvider.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider.example;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
+import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MediumTests.class, SecurityTests.class})
+public class TestShadeSaslAuthenticationProvider {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestShadeSaslAuthenticationProvider.class);
+
+  private static final char[] USER1_PASSWORD = "foobarbaz".toCharArray();
+
+  static LocalHBaseCluster createCluster(HBaseTestingUtility util, File keytabFile,
+      MiniKdc kdc, Map<String,char[]> userDatabase) throws Exception {
+    String servicePrincipal = "hbase/localhost";
+    String spnegoPrincipal = "HTTP/localhost";
+    kdc.createPrincipal(keytabFile, servicePrincipal);
+    util.startMiniZKCluster();
+
+    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
+        servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
+    HBaseKerberosUtils.setSSLConfiguration(util, TestShadeSaslAuthenticationProvider.class);
+
+    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        TokenProvider.class.getName());
+    util.startMiniDFSCluster(1);
+    Path testDir = util.getDataTestDirOnTestFS("TestShadeSaslAuthenticationProvider");
+    USER_DATABASE_FILE = new Path(testDir, "user-db.txt");
+
+    createUserDBFile(
+        USER_DATABASE_FILE.getFileSystem(CONF), USER_DATABASE_FILE, userDatabase);
+    CONF.set(ShadeSaslServerAuthenticationProvider.PASSWORD_FILE_KEY,
+        USER_DATABASE_FILE.toString());
+
+    Path rootdir = new Path(testDir, "hbase-root");
+    FSUtils.setRootDir(CONF, rootdir);
+    LocalHBaseCluster cluster = new LocalHBaseCluster(CONF, 1);
+    return cluster;
+  }
+
+  static void createUserDBFile(FileSystem fs, Path p,
+      Map<String,char[]> userDatabase) throws IOException {
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    try (FSDataOutputStream out = fs.create(p);
+        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out))) {
+      for (Entry<String,char[]> e : userDatabase.entrySet()) {
+        writer.write(e.getKey());
+        writer.write(ShadeSaslServerAuthenticationProvider.SEPARATOR);
+        writer.write(e.getValue());
+        writer.newLine();
+      }
+    }
+  }
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final Configuration CONF = UTIL.getConfiguration();
+  private static LocalHBaseCluster CLUSTER;
+  private static File KEYTAB_FILE;
+  private static Path USER_DATABASE_FILE;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    KEYTAB_FILE = new File(
+        UTIL.getDataTestDir("keytab").toUri().getPath());
+    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
+
+    // Adds our test impls instead of creating service loader entries which
+    // might inadvertently get them loaded on a real cluster.
+    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        ShadeSaslClientAuthenticationProvider.class.getName());
+    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        ShadeSaslServerAuthenticationProvider.class.getName());
+    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
+        ShadeProviderSelector.class.getName());
+
+    CLUSTER = createCluster(UTIL, KEYTAB_FILE, kdc,
+        Collections.singletonMap("user1", USER1_PASSWORD));
+    CLUSTER.startup();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    if (CLUSTER != null) {
+      CLUSTER.shutdown();
+      CLUSTER = null;
+    }
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  @Rule
+  public TestName name = new TestName();
+  TableName tableName;
+  String clusterId;
+
+  @Before
+  public void createTable() throws Exception {
+    tableName = TableName.valueOf(name.getMethodName());
+
+    // Create a table and write a record as the service user (hbase)
+    UserGroupInformation serviceUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+        "hbase/localhost", KEYTAB_FILE.getAbsolutePath());
+    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
+      @Override public String run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(CONF);
+            Admin admin = conn.getAdmin();) {
+          admin.createTable(TableDescriptorBuilder
+              .newBuilder(tableName)
+              .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1"))
+              .build());
+
+          UTIL.waitTableAvailable(tableName);
+
+          try (Table t = conn.getTable(tableName)) {
+            Put p = new Put(Bytes.toBytes("r1"));
+            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
+            t.put(p);
+          }
+
+          return admin.getClusterMetrics().getClusterId();
+        }
+      }
+    });
+
+    assertNotNull(clusterId);
+  }
+
+  @Test
+  public void testPositiveAuthentication() throws Exception {
+    final Configuration clientConf = new Configuration(CONF);
+    try (Connection conn = ConnectionFactory.createConnection(clientConf)) {
+      UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
+          "user1", new String[0]);
+      user1.addToken(ShadeClientTokenUtil.obtainToken(conn, "user1", USER1_PASSWORD));
+      user1.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override public Void run() throws Exception {
+          try (Table t = conn.getTable(tableName)) {
+            Result r = t.get(new Get(Bytes.toBytes("r1")));
+            assertNotNull(r);
+            assertFalse("Should have read a non-empty Result", r.isEmpty());
+            final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
+            assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
+
+            return null;
+          }
+        }
+      });
+    }
+  }
+
+  @Test(expected = DoNotRetryIOException.class)
+  public void testNegativeAuthentication() throws Exception {
+    // Validate that we can read that record back out as the user with our custom auth'n
+    final Configuration clientConf = new Configuration(CONF);
+    clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    try (Connection conn = ConnectionFactory.createConnection(clientConf)) {
+      UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
+          "user1", new String[0]);
+      user1.addToken(
+          ShadeClientTokenUtil.obtainToken(conn, "user1", "not a real password".toCharArray()));
+      user1.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override public Void run() throws Exception {
+          try (Connection conn = ConnectionFactory.createConnection(clientConf);
+              Table t = conn.getTable(tableName)) {
+            t.get(new Get(Bytes.toBytes("r1")));
+            fail("Should not successfully authenticate with HBase");
+            return null;
+          }
+        }
+      });
+    }
+  }
+}
diff --git a/hbase-examples/src/test/resources/log4j.properties b/hbase-examples/src/test/resources/log4j.properties
index c322699..4e5f014 100644
--- a/hbase-examples/src/test/resources/log4j.properties
+++ b/hbase-examples/src/test/resources/log4j.properties
@@ -66,3 +66,4 @@ log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
 log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
 # Enable this to get detailed connection error/retry logging.
 # log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
+log4j.logger.org.apache.directory=WARN
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 61f87d4..1632bb1 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -47,6 +47,12 @@
           <include>hbase-webapps/**</include>
         </includes>
       </resource>
+      <resource>
+        <directory>src/main/resources</directory>
+        <includes>
+          <include>**/**</include>
+        </includes>
+      </resource>
     </resources>
     <testResources>
       <!-- Our test artifact has different license info than our source/bin ones -->
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index cff917f..b16b95e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.security.GeneralSecurityException;
+import java.util.Objects;
 import java.util.Properties;
 
 import org.apache.commons.crypto.cipher.CryptoCipherFactory;
@@ -45,11 +46,13 @@ import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.SaslStatus;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
+import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
+import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
@@ -76,7 +79,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.TokenIdentifier;
 
 /** Reads calls from a connection and queues them for handling. */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
@@ -108,7 +110,7 @@ abstract class ServerRpcConnection implements Closeable {
   protected CompressionCodec compressionCodec;
   protected BlockingService service;
 
-  protected AuthMethod authMethod;
+  protected SaslServerAuthenticationProvider provider;
   protected boolean saslContextEstablished;
   protected boolean skipInitialSaslHandshake;
   private ByteBuffer unwrappedData;
@@ -127,10 +129,12 @@ abstract class ServerRpcConnection implements Closeable {
 
   protected User user = null;
   protected UserGroupInformation ugi = null;
+  protected SaslServerAuthenticationProviders saslProviders = null;
 
   public ServerRpcConnection(RpcServer rpcServer) {
     this.rpcServer = rpcServer;
     this.callCleanup = null;
+    this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf());
   }
 
   @Override
@@ -159,27 +163,10 @@ abstract class ServerRpcConnection implements Closeable {
 
   private String getFatalConnectionString(final int version, final byte authByte) {
     return "serverVersion=" + RpcServer.CURRENT_VERSION +
-    ", clientVersion=" + version + ", authMethod=" + authByte +
-    ", authSupported=" + (authMethod != null) + " from " + toString();
-  }
-
-  private UserGroupInformation getAuthorizedUgi(String authorizedId)
-      throws IOException {
-    UserGroupInformation authorizedUgi;
-    if (authMethod == AuthMethod.DIGEST) {
-      TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
-          this.rpcServer.secretManager);
-      authorizedUgi = tokenId.getUser();
-      if (authorizedUgi == null) {
-        throw new AccessDeniedException(
-            "Can't retrieve username from tokenIdentifier.");
-      }
-      authorizedUgi.addTokenIdentifier(tokenId);
-    } else {
-      authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
-    }
-    authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
-    return authorizedUgi;
+        ", clientVersion=" + version + ", authMethod=" + authByte +
+        // The provider may be null if we failed to parse the header of the request
+        ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) +
+        " from " + toString();
   }
 
   /**
@@ -362,9 +349,10 @@ abstract class ServerRpcConnection implements Closeable {
       try {
         if (saslServer == null) {
           saslServer =
-              new HBaseSaslRpcServer(authMethod, rpcServer.saslProps, rpcServer.secretManager);
+              new HBaseSaslRpcServer(
+                  rpcServer.getConf(), provider, rpcServer.saslProps, rpcServer.secretManager);
           RpcServer.LOG.debug("Created SASL server with mechanism={}",
-              authMethod.getMechanismName());
+              provider.getSaslAuthMethod().getAuthMethod());
         }
         RpcServer.LOG.debug("Read input token of size={} for processing by saslServer." +
             "evaluateResponse()", saslToken.limit());
@@ -386,7 +374,7 @@ abstract class ServerRpcConnection implements Closeable {
         String clientIP = this.toString();
         // attempting user could be null
         RpcServer.AUDITLOG
-            .warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + saslServer.getAttemptingUser());
+            .warn("{} {}: {}", RpcServer.AUTH_FAILED_FOR, clientIP, saslServer.getAttemptingUser());
         throw e;
       }
       if (replyToken != null) {
@@ -400,11 +388,11 @@ abstract class ServerRpcConnection implements Closeable {
       if (saslServer.isComplete()) {
         String qop = saslServer.getNegotiatedQop();
         useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
-        ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
-        if (RpcServer.LOG.isDebugEnabled()) {
-          RpcServer.LOG.debug("SASL server context established. Authenticated client: " + ugi +
-              ". Negotiated QoP is " + qop);
-        }
+        ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(),
+            this.rpcServer.secretManager);
+        RpcServer.LOG.debug(
+            "SASL server context established. Authenticated client: {}. Negotiated QoP is {}",
+            ugi, qop);
         this.rpcServer.metrics.authenticationSuccess();
         RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
         saslContextEstablished = true;
@@ -473,7 +461,7 @@ abstract class ServerRpcConnection implements Closeable {
       // authorize real user. doAs is allowed only for simple or kerberos
       // authentication
       if (ugi != null && ugi.getRealUser() != null
-          && (authMethod != AuthMethod.DIGEST)) {
+          && provider.supportsProtocolAuthentication()) {
         ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
       }
       this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
@@ -512,7 +500,7 @@ abstract class ServerRpcConnection implements Closeable {
     if (!useSasl) {
       ugi = protocolUser;
       if (ugi != null) {
-        ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
       }
       // audit logging for SASL authenticated users happens in saslReadAndProcess()
       if (authenticatedWithFallback) {
@@ -521,13 +509,13 @@ abstract class ServerRpcConnection implements Closeable {
       }
     } else {
       // user is authenticated
-      ugi.setAuthenticationMethod(authMethod.authenticationMethod);
+      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
       //Now we check if this is a proxy user case. If the protocol user is
       //different from the 'user', it is a proxy user scenario. However,
       //this is not allowed if user authenticated with DIGEST.
       if ((protocolUser != null)
           && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
-        if (authMethod == AuthMethod.DIGEST) {
+        if (!provider.supportsProtocolAuthentication()) {
           // Not allowed to doAs if token authentication is used
           throw new AccessDeniedException("Authenticated user (" + ugi
               + ") doesn't match what the client claims to be ("
@@ -741,18 +729,20 @@ abstract class ServerRpcConnection implements Closeable {
     }
     int version = preambleBuffer.get() & 0xFF;
     byte authbyte = preambleBuffer.get();
-    this.authMethod = AuthMethod.valueOf(authbyte);
+
     if (version != SimpleRpcServer.CURRENT_VERSION) {
       String msg = getFatalConnectionString(version, authbyte);
       doBadPreambleHandling(msg, new WrongVersionException(msg));
       return false;
     }
-    if (authMethod == null) {
+    this.provider = this.saslProviders.selectProvider(authbyte);
+    if (this.provider == null) {
       String msg = getFatalConnectionString(version, authbyte);
       doBadPreambleHandling(msg, new BadAuthException(msg));
       return false;
     }
-    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+    // TODO this is a wart while simple auth'n doesn't go through sasl.
+    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
       if (this.rpcServer.allowFallbackToSimpleAuth) {
         this.rpcServer.metrics.authenticationFallback();
         authenticatedWithFallback = true;
@@ -762,21 +752,23 @@ abstract class ServerRpcConnection implements Closeable {
         return false;
       }
     }
-    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
       doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
         null);
-      authMethod = AuthMethod.SIMPLE;
+      provider = saslProviders.getSimpleProvider();
       // client has already sent the initial Sasl message and we
       // should ignore it. Both client and server should fall back
       // to simple auth from now on.
       skipInitialSaslHandshake = true;
     }
-    if (authMethod != AuthMethod.SIMPLE) {
-      useSasl = true;
-    }
+    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
     return true;
   }
 
+  boolean isSimpleAuthentication() {
+    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
+  }
+
   public abstract boolean isConnectionOpen();
 
   public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index b392435..7ee24c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -20,27 +20,23 @@ package org.apache.hadoop.hbase.security;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Map;
+import java.util.Optional;
 
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.provider.AttemptingUserProvidingSaslServer;
+import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A utility class that encapsulates SASL logic for RPC server. Copied from
@@ -51,47 +47,14 @@ public class HBaseSaslRpcServer {
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseSaslRpcServer.class);
 
+  private final AttemptingUserProvidingSaslServer serverWithProvider;
   private final SaslServer saslServer;
 
-  private UserGroupInformation attemptingUser; // user name before auth
-
-  public HBaseSaslRpcServer(AuthMethod method, Map<String, String> saslProps,
-      SecretManager<TokenIdentifier> secretManager) throws IOException {
-    switch (method) {
-      case DIGEST:
-        if (secretManager == null) {
-          throw new AccessDeniedException("Server is not configured to do DIGEST authentication.");
-        }
-        saslServer = Sasl.createSaslServer(AuthMethod.DIGEST.getMechanismName(), null,
-          SaslUtil.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler(secretManager));
-        break;
-      case KERBEROS:
-        UserGroupInformation current = UserGroupInformation.getCurrentUser();
-        String fullName = current.getUserName();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Kerberos principal name is " + fullName);
-        }
-        String[] names = SaslUtil.splitKerberosName(fullName);
-        if (names.length != 3) {
-          throw new AccessDeniedException(
-              "Kerberos principal name does NOT have the expected " + "hostname part: " + fullName);
-        }
-        try {
-          saslServer = current.doAs(new PrivilegedExceptionAction<SaslServer>() {
-            @Override
-            public SaslServer run() throws SaslException {
-              return Sasl.createSaslServer(AuthMethod.KERBEROS.getMechanismName(), names[0],
-                names[1], saslProps, new SaslGssCallbackHandler());
-            }
-          });
-        } catch (InterruptedException e) {
-          // should not happen
-          throw new AssertionError(e);
-        }
-        break;
-      default:
-        throw new IOException("Unknown authentication method " + method);
-    }
+  public HBaseSaslRpcServer(Configuration conf, SaslServerAuthenticationProvider provider,
+      Map<String, String> saslProps, SecretManager<TokenIdentifier> secretManager)
+          throws IOException {
+    serverWithProvider = provider.createServer(secretManager, saslProps);
+    saslServer = serverWithProvider.getServer();
   }
 
   public boolean isComplete() {
@@ -107,8 +70,12 @@ public class HBaseSaslRpcServer {
     SaslUtil.safeDispose(saslServer);
   }
 
-  public UserGroupInformation getAttemptingUser() {
-    return attemptingUser;
+  public String getAttemptingUser() {
+    Optional<UserGroupInformation> optionalUser = serverWithProvider.getAttemptingUser();
+    if (optionalUser.isPresent()) {
+      optionalUser.get().toString();
+    }
+    return "Unknown";
   }
 
   public byte[] wrap(byte[] buf, int off, int len) throws SaslException {
@@ -138,99 +105,4 @@ public class HBaseSaslRpcServer {
     }
     return tokenIdentifier;
   }
-
-  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
-  private class SaslDigestCallbackHandler implements CallbackHandler {
-    private SecretManager<TokenIdentifier> secretManager;
-
-    public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> secretManager) {
-      this.secretManager = secretManager;
-    }
-
-    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
-      return SaslUtil.encodePassword(secretManager.retrievePassword(tokenid));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      AuthorizeCallback ac = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof AuthorizeCallback) {
-          ac = (AuthorizeCallback) callback;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          continue; // realm is ignored
-        } else {
-          throw new UnsupportedCallbackException(callback, "Unrecognized SASL DIGEST-MD5 Callback");
-        }
-      }
-      if (pc != null) {
-        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
-        char[] password = getPassword(tokenIdentifier);
-        UserGroupInformation user = tokenIdentifier.getUser(); // may throw exception
-        attemptingUser = user;
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("SASL server DIGEST-MD5 callback: setting password " + "for client: " +
-              tokenIdentifier.getUser());
-        }
-        pc.setPassword(password);
-      }
-      if (ac != null) {
-        String authid = ac.getAuthenticationID();
-        String authzid = ac.getAuthorizationID();
-        if (authid.equals(authzid)) {
-          ac.setAuthorized(true);
-        } else {
-          ac.setAuthorized(false);
-        }
-        if (ac.isAuthorized()) {
-          if (LOG.isTraceEnabled()) {
-            String username = getIdentifier(authzid, secretManager).getUser().getUserName();
-            LOG.trace(
-              "SASL server DIGEST-MD5 callback: setting " + "canonicalized client ID: " + username);
-          }
-          ac.setAuthorizedID(authzid);
-        }
-      }
-    }
-  }
-
-  /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
-  private static class SaslGssCallbackHandler implements CallbackHandler {
-
-    /** {@inheritDoc} */
-    @Override
-    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
-      AuthorizeCallback ac = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof AuthorizeCallback) {
-          ac = (AuthorizeCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Callback");
-        }
-      }
-      if (ac != null) {
-        String authid = ac.getAuthenticationID();
-        String authzid = ac.getAuthorizationID();
-        if (authid.equals(authzid)) {
-          ac.setAuthorized(true);
-        } else {
-          ac.setAuthorized(false);
-        }
-        if (ac.isAuthorized()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-              "SASL server GSSAPI callback: setting " + "canonicalized client ID: " + authzid);
-          }
-          ac.setAuthorizedID(authzid);
-        }
-      }
-    }
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/AttemptingUserProvidingSaslServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/AttemptingUserProvidingSaslServer.java
new file mode 100644
index 0000000..e6dc357
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/AttemptingUserProvidingSaslServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Wrapper around a SaslServer which provides the last user attempting to authenticate via SASL,
+ * if the server/mechanism allow figuring that out.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public class AttemptingUserProvidingSaslServer {
+  private final Supplier<UserGroupInformation> producer;
+  private final SaslServer saslServer;
+
+  public AttemptingUserProvidingSaslServer(
+      SaslServer saslServer, Supplier<UserGroupInformation> producer) {
+    this.saslServer = saslServer;
+    this.producer = producer;
+  }
+
+  public SaslServer getServer() {
+    return saslServer;
+  }
+
+  public Optional<UserGroupInformation> getAttemptingUser() {
+    return Optional.ofNullable(producer.get());
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslServerAuthenticationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslServerAuthenticationProvider.java
new file mode 100644
index 0000000..b3236d6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslServerAuthenticationProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class DigestSaslServerAuthenticationProvider extends DigestSaslAuthenticationProvider
+    implements SaslServerAuthenticationProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DigestSaslServerAuthenticationProvider.class);
+
+  private AtomicReference<UserGroupInformation> attemptingUser = new AtomicReference<>(null);
+
+  @Override
+  public AttemptingUserProvidingSaslServer createServer(
+      SecretManager<TokenIdentifier> secretManager,
+      Map<String, String> saslProps) throws IOException {
+    if (secretManager == null) {
+      throw new AccessDeniedException("Server is not configured to do DIGEST authentication.");
+    }
+    final SaslServer server = Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
+      SaslUtil.SASL_DEFAULT_REALM, saslProps,
+      new SaslDigestCallbackHandler(secretManager, attemptingUser));
+
+    return new AttemptingUserProvidingSaslServer(server, () -> attemptingUser.get());
+  }
+
+  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+  private static class SaslDigestCallbackHandler implements CallbackHandler {
+    private final SecretManager<TokenIdentifier> secretManager;
+    private final AtomicReference<UserGroupInformation> attemptingUser;
+
+    public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> secretManager,
+        AtomicReference<UserGroupInformation> attemptingUser) {
+      this.secretManager = secretManager;
+      this.attemptingUser = attemptingUser;
+    }
+
+    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
+      return SaslUtil.encodePassword(secretManager.retrievePassword(tokenid));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL DIGEST-MD5 Callback");
+        }
+      }
+      if (pc != null) {
+        TokenIdentifier tokenIdentifier = HBaseSaslRpcServer.getIdentifier(
+            nc.getDefaultName(), secretManager);
+        attemptingUser.set(tokenIdentifier.getUser());
+        char[] password = getPassword(tokenIdentifier);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("SASL server DIGEST-MD5 callback: setting password for client: {}",
+              tokenIdentifier.getUser());
+        }
+        pc.setPassword(password);
+      }
+      if (ac != null) {
+        // The authentication ID is the identifier (username) of the user who authenticated via
+        // SASL (the one who provided credentials). The authorization ID is who the remote user
+        // "asked" to be once they authenticated. This is akin to the UGI/JAAS "doAs" notion, e.g.
+        // authentication ID is the "real" user and authorization ID is the "proxy" user.
+        //
+        // For DelegationTokens: we do not expect any remote user with a delegation token to execute
+        // any RPCs as a user other than themselves. We disallow all cases where the real user
+        // does not match who the remote user wants to execute a request as someone else.
+        String authenticatedUserId = ac.getAuthenticationID();
+        String userRequestedToExecuteAs = ac.getAuthorizationID();
+        if (authenticatedUserId.equals(userRequestedToExecuteAs)) {
+          ac.setAuthorized(true);
+          if (LOG.isTraceEnabled()) {
+            String username = HBaseSaslRpcServer.getIdentifier(
+                userRequestedToExecuteAs, secretManager).getUser().getUserName();
+            LOG.trace(
+              "SASL server DIGEST-MD5 callback: setting " + "canonicalized client ID: " + username);
+          }
+          ac.setAuthorizedID(userRequestedToExecuteAs);
+        } else {
+          ac.setAuthorized(false);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean supportsProtocolAuthentication() {
+    return false;
+  }
+
+  @Override
+  public UserGroupInformation getAuthorizedUgi(String authzId,
+      SecretManager<TokenIdentifier> secretManager) throws IOException {
+    UserGroupInformation authorizedUgi;
+    TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authzId, secretManager);
+    authorizedUgi = tokenId.getUser();
+    if (authorizedUgi == null) {
+      throw new AccessDeniedException(
+          "Can't retrieve username from tokenIdentifier.");
+    }
+    authorizedUgi.addTokenIdentifier(tokenId);
+    authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
+    return authorizedUgi;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslServerAuthenticationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslServerAuthenticationProvider.java
new file mode 100644
index 0000000..8a542c6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslServerAuthenticationProvider.java
@@ -0,0 +1,115 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class GssSaslServerAuthenticationProvider extends GssSaslAuthenticationProvider
+    implements SaslServerAuthenticationProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      GssSaslServerAuthenticationProvider.class);
+
+  @Override
+  public AttemptingUserProvidingSaslServer createServer(
+      SecretManager<TokenIdentifier> secretManager,
+      Map<String, String> saslProps) throws IOException {
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    String fullName = current.getUserName();
+    LOG.debug("Server's Kerberos principal name is {}", fullName);
+    String[] names = SaslUtil.splitKerberosName(fullName);
+    if (names.length != 3) {
+      throw new AccessDeniedException(
+          "Kerberos principal does NOT contain an instance (hostname): " + fullName);
+    }
+    try {
+      return current.doAs(new PrivilegedExceptionAction<AttemptingUserProvidingSaslServer>() {
+        @Override
+        public AttemptingUserProvidingSaslServer run() throws SaslException {
+          return new AttemptingUserProvidingSaslServer(Sasl.createSaslServer(
+              getSaslAuthMethod().getSaslMechanism(), names[0], names[1], saslProps,
+              new SaslGssCallbackHandler()), () -> null);
+        }
+      });
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Failed to construct GSS SASL server");
+    }
+  }
+
+  /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+  private static class SaslGssCallbackHandler implements CallbackHandler {
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Callback");
+        }
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          LOG.debug("SASL server GSSAPI callback: setting canonicalized client ID: {}", authzid);
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean supportsProtocolAuthentication() {
+    return true;
+  }
+
+  @Override
+  public UserGroupInformation getAuthorizedUgi(String authzId,
+      SecretManager<TokenIdentifier> secretManager) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(authzId);
+    ugi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
+    return ugi;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProvider.java
new file mode 100644
index 0000000..3487cfc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Encapsulates the server-side logic to authenticate a client over SASL. Tied one-to-one to
+ * a single client authentication implementation.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.AUTHENTICATION)
+@InterfaceStability.Evolving
+public interface SaslServerAuthenticationProvider extends SaslAuthenticationProvider {
+
+  /**
+   * Allows implementations to initialize themselves, prior to creating a server.
+   */
+  default void init(Configuration conf) throws IOException {}
+
+  /**
+   * Creates the SaslServer to accept incoming SASL authentication requests.
+   */
+  AttemptingUserProvidingSaslServer createServer(SecretManager<TokenIdentifier> secretManager,
+      Map<String, String> saslProps) throws IOException;
+
+  boolean supportsProtocolAuthentication();
+
+  UserGroupInformation getAuthorizedUgi(String authzId,
+      SecretManager<TokenIdentifier> secretManager) throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProviders.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProviders.java
new file mode 100644
index 0000000..d1c22b7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SaslServerAuthenticationProviders.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class SaslServerAuthenticationProviders {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SaslClientAuthenticationProviders.class);
+
+  public static final String EXTRA_PROVIDERS_KEY = "hbase.server.sasl.provider.extras";
+  private static final AtomicReference<SaslServerAuthenticationProviders> holder =
+      new AtomicReference<>();
+
+  private final HashMap<Byte, SaslServerAuthenticationProvider> providers;
+
+  private SaslServerAuthenticationProviders(Configuration conf,
+      HashMap<Byte, SaslServerAuthenticationProvider> providers) {
+    this.providers = providers;
+  }
+
+  /**
+   * Returns the number of registered providers.
+   */
+  public int getNumRegisteredProviders() {
+    return providers.size();
+  }
+
+  /**
+   * Returns a singleton instance of {@link SaslServerAuthenticationProviders}.
+   */
+  public static SaslServerAuthenticationProviders getInstance(Configuration conf) {
+    SaslServerAuthenticationProviders providers = holder.get();
+    if (null == providers) {
+      synchronized (holder) {
+        // Someone else beat us here
+        providers = holder.get();
+        if (null != providers) {
+          return providers;
+        }
+
+        providers = createProviders(conf);
+        holder.set(providers);
+      }
+    }
+    return providers;
+  }
+
+  /**
+   * Removes the cached singleton instance of {@link SaslServerAuthenticationProviders}.
+   */
+  public static void reset() {
+    synchronized (holder) {
+      holder.set(null);
+    }
+  }
+
+  /**
+   * Adds the given provider into the map of providers if a mapping for the auth code does not
+   * already exist in the map.
+   */
+  static void addProviderIfNotExists(SaslServerAuthenticationProvider provider,
+      HashMap<Byte,SaslServerAuthenticationProvider> providers) {
+    final byte newProviderAuthCode = provider.getSaslAuthMethod().getCode();
+    final SaslServerAuthenticationProvider alreadyRegisteredProvider = providers.get(
+        newProviderAuthCode);
+    if (alreadyRegisteredProvider != null) {
+      throw new RuntimeException("Trying to load SaslServerAuthenticationProvider "
+          + provider.getClass() + ", but "+ alreadyRegisteredProvider.getClass()
+          + " is already registered with the same auth code");
+    }
+    providers.put(newProviderAuthCode, provider);
+  }
+
+  /**
+   * Adds any providers defined in the configuration.
+   */
+  static void addExtraProviders(Configuration conf,
+      HashMap<Byte,SaslServerAuthenticationProvider> providers) {
+    for (String implName : conf.getStringCollection(EXTRA_PROVIDERS_KEY)) {
+      Class<?> clz;
+      try {
+        clz = Class.forName(implName);
+      } catch (ClassNotFoundException e) {
+        LOG.warn("Failed to find SaslServerAuthenticationProvider class {}", implName, e);
+        continue;
+      }
+
+      if (!SaslServerAuthenticationProvider.class.isAssignableFrom(clz)) {
+        LOG.warn("Server authentication class {} is not an instance of "
+            + "SaslServerAuthenticationProvider", clz);
+        continue;
+      }
+
+      try {
+        SaslServerAuthenticationProvider provider =
+            (SaslServerAuthenticationProvider) clz.newInstance();
+        addProviderIfNotExists(provider, providers);
+      } catch (InstantiationException | IllegalAccessException e) {
+        LOG.warn("Failed to instantiate {}", clz, e);
+      }
+    }
+  }
+
+  /**
+   * Loads server authentication providers from the classpath and configuration, and then creates
+   * the SaslServerAuthenticationProviders instance.
+   */
+  static SaslServerAuthenticationProviders createProviders(Configuration conf) {
+    ServiceLoader<SaslServerAuthenticationProvider> loader =
+        ServiceLoader.load(SaslServerAuthenticationProvider.class);
+    HashMap<Byte,SaslServerAuthenticationProvider> providers = new HashMap<>();
+    for (SaslServerAuthenticationProvider provider : loader) {
+      addProviderIfNotExists(provider, providers);
+    }
+
+    addExtraProviders(conf, providers);
+
+    if (LOG.isTraceEnabled()) {
+      String loadedProviders = providers.values().stream()
+          .map((provider) -> provider.getClass().getName())
+          .collect(Collectors.joining(", "));
+      if (loadedProviders.isEmpty()) {
+        loadedProviders = "None!";
+      }
+      LOG.trace("Found SaslServerAuthenticationProviders {}", loadedProviders);
+    }
+
+    // Initialize the providers once, before we get into the RPC path.
+    providers.forEach((b,provider) -> {
+      try {
+        // Give them a copy, just to make sure there is no funny-business going on.
+        provider.init(new Configuration(conf));
+      } catch (IOException e) {
+        LOG.error("Failed to initialize {}", provider.getClass(), e);
+        throw new RuntimeException(
+            "Failed to initialize " + provider.getClass().getName(), e);
+      }
+    });
+
+    return new SaslServerAuthenticationProviders(conf, providers);
+  }
+
+  /**
+   * Selects the appropriate SaslServerAuthenticationProvider from those available. If there is no
+   * matching provider for the given {@code authByte}, this method will return null.
+   */
+  public SaslServerAuthenticationProvider selectProvider(byte authByte) {
+    return providers.get(Byte.valueOf(authByte));
+  }
+
+  /**
+   * Extracts the SIMPLE authentication provider.
+   */
+  public SaslServerAuthenticationProvider getSimpleProvider() {
+    Optional<SaslServerAuthenticationProvider> opt = providers.values()
+        .stream()
+        .filter((p) -> p instanceof SimpleSaslServerAuthenticationProvider)
+        .findFirst();
+    if (!opt.isPresent()) {
+      throw new RuntimeException("SIMPLE authentication provider not available when it should be");
+    }
+    return opt.get();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslServerAuthenticationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslServerAuthenticationProvider.java
new file mode 100644
index 0000000..ed7bf4c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslServerAuthenticationProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class SimpleSaslServerAuthenticationProvider extends SimpleSaslAuthenticationProvider
+    implements SaslServerAuthenticationProvider {
+
+  @Override
+  public AttemptingUserProvidingSaslServer createServer(
+      SecretManager<TokenIdentifier> secretManager,
+      Map<String, String> saslProps) throws IOException {
+    throw new RuntimeException("HBase SIMPLE authentication doesn't use SASL");
+  }
+
+  @Override
+  public boolean supportsProtocolAuthentication() {
+    return true;
+  }
+
+  @Override
+  public UserGroupInformation getAuthorizedUgi(String authzId,
+      SecretManager<TokenIdentifier> secretManager) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(authzId);
+    ugi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
+    return ugi;
+  }
+}
diff --git a/hbase-server/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider b/hbase-server/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider
new file mode 100644
index 0000000..1c7af49
--- /dev/null
+++ b/hbase-server/src/main/resources/META-INF/services/org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.hbase.security.provider.DigestSaslServerAuthenticationProvider
+org.apache.hadoop.hbase.security.provider.GssSaslServerAuthenticationProvider
+org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider
+
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestCustomSaslAuthenticationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestCustomSaslAuthenticationProvider.java
new file mode 100644
index 0000000..957810e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestCustomSaslAuthenticationProvider.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.SecureTestCluster;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+
+/**
+ * Tests the pluggable authentication framework with SASL using a contrived authentication system.
+ *
+ * This tests holds a "user database" in memory as a hashmap. Clients provide their password
+ * in the client Hadoop configuration. The servers validate this password via the "user database".
+ */
+@Category({MediumTests.class, SecurityTests.class})
+public class TestCustomSaslAuthenticationProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestCustomSaslAuthenticationProvider.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestCustomSaslAuthenticationProvider.class);
+
+  private static final Map<String,String> USER_DATABASE = createUserDatabase();
+
+  private static final String USER1_PASSWORD = "foobarbaz";
+  private static final String USER2_PASSWORD = "bazbarfoo";
+
+  private static Map<String,String> createUserDatabase() {
+    Map<String,String> db = new ConcurrentHashMap<>();
+    db.put("user1", USER1_PASSWORD);
+    db.put("user2", USER2_PASSWORD);
+    return db;
+  }
+
+  public static String getPassword(String user) {
+    String password = USER_DATABASE.get(user);
+    if (password == null) {
+      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
+    }
+    return password;
+  }
+
+  /**
+   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier
+   * used for delegation tokens.
+   */
+  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
+    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
+    private String username;
+
+    public PasswordAuthTokenIdentifier() {}
+
+    public PasswordAuthTokenIdentifier(String username) {
+      this.username = username;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.username = WritableUtils.readString(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeString(out, username);
+    }
+
+    @Override
+    public Text getKind() {
+      return PASSWORD_AUTH_TOKEN;
+    }
+
+    @Override
+    public UserGroupInformation getUser() {
+      if (username == null || "".equals(username)) {
+        return null;
+      }
+      return UserGroupInformation.createRemoteUser(username);
+    }
+  }
+
+  public static Token<? extends TokenIdentifier> createPasswordToken(
+      String username, String password, String clusterId) {
+    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
+    Token<? extends TokenIdentifier> token = new Token<>(id.getBytes(), Bytes.toBytes(password),
+        id.getKind(), new Text(clusterId));
+    return token;
+  }
+
+  /**
+   * Client provider that finds custom Token in the user's UGI and authenticates with the server
+   * via DIGEST-MD5 using that password.
+   */
+  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
+    public static final String MECHANISM = "DIGEST-MD5";
+    public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
+        "IN_MEMORY", (byte)42, MECHANISM, AuthenticationMethod.TOKEN);
+
+    @Override
+    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+        SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
+        Map<String, String> saslProps) throws IOException {
+      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
+          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
+    }
+
+    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
+      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
+        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
+        .collect(Collectors.toList());
+      if (tokens.isEmpty()) {
+        return Optional.empty();
+      }
+      if (tokens.size() > 1) {
+        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
+      }
+      return Optional.of(tokens.get(0));
+    }
+
+    @Override
+    public SaslAuthMethod getSaslAuthMethod() {
+      return SASL_AUTH_METHOD;
+    }
+
+    /**
+     * Sasl CallbackHandler which extracts information from our custom token and places
+     * it into the Sasl objects.
+     */
+    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
+      private final Token<? extends TokenIdentifier> token;
+      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
+        this.token = token;
+      }
+
+      @Override
+      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
+        }
+        if (pc != null) {
+          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
+        }
+        if (rc != null) {
+          rc.setText(rc.getDefaultText());
+        }
+      }
+    }
+
+    @Override
+    public UserInformation getUserInfo(User user) {
+      return null;
+    }
+  }
+
+  /**
+   * Server provider which validates credentials from an in-memory database.
+   */
+  public static class InMemoryServerProvider extends InMemoryClientProvider
+      implements SaslServerAuthenticationProvider {
+
+    @Override
+    public AttemptingUserProvidingSaslServer createServer(
+        SecretManager<TokenIdentifier> secretManager,
+        Map<String, String> saslProps) throws IOException {
+      return new AttemptingUserProvidingSaslServer(
+          Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
+              SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
+        () -> null);
+    }
+
+    /**
+     * Pulls the correct password for the user who started the SASL handshake so that SASL
+     * can validate that the user provided the right password.
+     */
+    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
+
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
+          }
+        }
+        if (nc != null && pc != null) {
+          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
+          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
+          try {
+            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
+          } catch (IOException e) {
+            throw (InvalidToken) new InvalidToken(
+                "Can't de-serialize tokenIdentifier").initCause(e);
+          }
+          char[] actualPassword = SaslUtil.encodePassword(
+              Bytes.toBytes(getPassword(id.getUser().getUserName())));
+          pc.setPassword(actualPassword);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
+    }
+
+    @Override
+    public boolean supportsProtocolAuthentication() {
+      return false;
+    }
+
+    @Override
+    public UserGroupInformation getAuthorizedUgi(String authzId,
+        SecretManager<TokenIdentifier> secretManager) throws IOException {
+      UserGroupInformation authorizedUgi;
+      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
+      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
+      try {
+        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
+      } catch (IOException e) {
+        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
+      }
+      authorizedUgi = tokenId.getUser();
+      if (authorizedUgi == null) {
+        throw new AccessDeniedException(
+            "Can't retrieve username from tokenIdentifier.");
+      }
+      authorizedUgi.addTokenIdentifier(tokenId);
+      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
+      return authorizedUgi;
+    }
+  }
+
+  /**
+   * Custom provider which can select our custom provider, amongst other tokens which
+   * may be available.
+   */
+  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
+    private InMemoryClientProvider inMemoryProvider;
+
+    @Override
+    public void configure(Configuration conf,
+        Collection<SaslClientAuthenticationProvider> providers) {
+      super.configure(conf, providers);
+      Optional<SaslClientAuthenticationProvider> o = providers.stream()
+        .filter((p) -> p instanceof InMemoryClientProvider)
+        .findAny();
+
+      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(
+        () -> new RuntimeException("InMemoryClientProvider not found in available providers: "
+              + providers));
+    }
+
+    @Override
+    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
+        String clusterId, User user) {
+      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
+          super.selectProvider(clusterId, user);
+
+      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
+      if (optional.isPresent()) {
+        LOG.info("Using InMemoryClientProvider");
+        return new Pair<>(inMemoryProvider, optional.get());
+      }
+
+      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
+      return superPair;
+    }
+  }
+
+  static LocalHBaseCluster createCluster(HBaseTestingUtility util, File keytabFile,
+      MiniKdc kdc) throws Exception {
+    String servicePrincipal = "hbase/localhost";
+    String spnegoPrincipal = "HTTP/localhost";
+    kdc.createPrincipal(keytabFile, servicePrincipal);
+    util.startMiniZKCluster();
+
+    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
+        servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
+    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
+
+    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        TokenProvider.class.getName());
+    util.startMiniDFSCluster(1);
+    Path rootdir = util.getDataTestDirOnTestFS("TestGenerateDelegationToken");
+    FSUtils.setRootDir(util.getConfiguration(), rootdir);
+    LocalHBaseCluster cluster = new LocalHBaseCluster(util.getConfiguration(), 1);
+    return cluster;
+  }
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final Configuration CONF = UTIL.getConfiguration();
+  private static LocalHBaseCluster CLUSTER;
+  private static File KEYTAB_FILE;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    KEYTAB_FILE = new File(
+        UTIL.getDataTestDir("keytab").toUri().getPath());
+    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
+
+    // Switch back to NIO for now.
+    CONF.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, BlockingRpcClient.class.getName());
+    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
+
+    // Adds our test impls instead of creating service loader entries which
+    // might inadvertently get them loaded on a real cluster.
+    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        InMemoryClientProvider.class.getName());
+    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        InMemoryServerProvider.class.getName());
+    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
+        InMemoryProviderSelector.class.getName());
+
+    CLUSTER = createCluster(UTIL, KEYTAB_FILE, kdc);
+    CLUSTER.startup();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    if (CLUSTER != null) {
+      CLUSTER.shutdown();
+      CLUSTER = null;
+    }
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  @Rule
+  public TestName name = new TestName();
+  TableName tableName;
+  String clusterId;
+
+  @Before
+  public void createTable() throws Exception {
+    tableName = TableName.valueOf(name.getMethodName());
+
+    // Create a table and write a record as the service user (hbase)
+    UserGroupInformation serviceUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+        "hbase/localhost", KEYTAB_FILE.getAbsolutePath());
+    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
+      @Override public String run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(CONF);
+            Admin admin = conn.getAdmin();) {
+          admin.createTable(TableDescriptorBuilder
+              .newBuilder(tableName)
+              .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1"))
+              .build());
+
+          UTIL.waitTableAvailable(tableName);
+
+          try (Table t = conn.getTable(tableName)) {
+            Put p = new Put(Bytes.toBytes("r1"));
+            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
+            t.put(p);
+          }
+
+          return admin.getClusterMetrics().getClusterId();
+        }
+      }
+    });
+
+    assertNotNull(clusterId);
+  }
+
+  @Test
+  public void testPositiveAuthentication() throws Exception {
+    // Validate that we can read that record back out as the user with our custom auth'n
+    final Configuration clientConf = new Configuration(CONF);
+    UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
+        "user1", new String[0]);
+    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
+    user1.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override public Void run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(clientConf);
+            Table t = conn.getTable(tableName)) {
+          Result r = t.get(new Get(Bytes.toBytes("r1")));
+          assertNotNull(r);
+          assertFalse("Should have read a non-empty Result", r.isEmpty());
+          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
+          assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
+
+          return null;
+        }
+      }
+    });
+  }
+
+  @Test(expected = RetriesExhaustedException.class)
+  public void testNegativeAuthentication() throws Exception {
+    // Validate that we can read that record back out as the user with our custom auth'n
+    final Configuration clientConf = new Configuration(CONF);
+    clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
+        "user1", new String[0]);
+    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
+    user1.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override public Void run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(clientConf);
+            Table t = conn.getTable(tableName)) {
+          t.get(new Get(Bytes.toBytes("r1")));
+          fail("Should not successfully authenticate with HBase");
+          return null;
+        }
+      }
+    });
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslServerAuthenticationProviders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslServerAuthenticationProviders.java
new file mode 100644
index 0000000..aa7b834
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/TestSaslServerAuthenticationProviders.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class, SecurityTests.class})
+public class TestSaslServerAuthenticationProviders {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSaslServerAuthenticationProviders.class);
+
+  @Before
+  public void reset() {
+    // Clear out any potentially bogus state from the providers class
+    SaslServerAuthenticationProviders.reset();
+  }
+
+  @Test
+  public void testCannotAddTheSameProviderTwice() {
+    HashMap<Byte,SaslServerAuthenticationProvider> registeredProviders = new HashMap<>();
+    SimpleSaslServerAuthenticationProvider p1 = new SimpleSaslServerAuthenticationProvider();
+    SimpleSaslServerAuthenticationProvider p2 = new SimpleSaslServerAuthenticationProvider();
+
+    SaslServerAuthenticationProviders.addProviderIfNotExists(p1, registeredProviders);
+    assertEquals(1, registeredProviders.size());
+
+    try {
+      SaslServerAuthenticationProviders.addProviderIfNotExists(p2, registeredProviders);
+    } catch (RuntimeException e) {}
+
+    assertSame("Expected the original provider to be present", p1,
+        registeredProviders.entrySet().iterator().next().getValue());
+  }
+
+  @Test
+  public void testInstanceIsCached() {
+    Configuration conf = HBaseConfiguration.create();
+    SaslServerAuthenticationProviders providers1 =
+        SaslServerAuthenticationProviders.getInstance(conf);
+    SaslServerAuthenticationProviders providers2 =
+        SaslServerAuthenticationProviders.getInstance(conf);
+    assertSame(providers1, providers2);
+
+    SaslServerAuthenticationProviders.reset();
+
+    SaslServerAuthenticationProviders providers3 =
+        SaslServerAuthenticationProviders.getInstance(conf);
+    assertNotSame(providers1, providers3);
+    assertEquals(providers1.getNumRegisteredProviders(), providers3.getNumRegisteredProviders());
+  }
+
+  @Test
+  public void instancesAreInitialized() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
+        InitCheckingSaslServerAuthenticationProvider.class.getName());
+
+    SaslServerAuthenticationProviders providers =
+        SaslServerAuthenticationProviders.getInstance(conf);
+
+    SaslServerAuthenticationProvider provider =
+        providers.selectProvider(InitCheckingSaslServerAuthenticationProvider.ID);
+    assertEquals(InitCheckingSaslServerAuthenticationProvider.class, provider.getClass());
+
+    assertTrue("Provider was not inititalized",
+        ((InitCheckingSaslServerAuthenticationProvider) provider).isInitialized());
+  }
+
+  public static class InitCheckingSaslServerAuthenticationProvider
+      implements SaslServerAuthenticationProvider {
+    public static final byte ID = (byte)88;
+    private boolean initialized = false;
+
+    public synchronized void init(Configuration conf) {
+      this.initialized = true;
+    }
+
+    public synchronized boolean isInitialized() {
+      return initialized;
+    }
+
+    @Override
+    public SaslAuthMethod getSaslAuthMethod() {
+      return new SaslAuthMethod("INIT_CHECKING", ID, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+    }
+
+    @Override
+    public String getTokenKind() {
+      return "INIT_CHECKING_TOKEN";
+    }
+
+    @Override
+    public AttemptingUserProvidingSaslServer createServer(
+        SecretManager<TokenIdentifier> secretManager,
+        Map<String, String> saslProps) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean supportsProtocolAuthentication() {
+      return false;
+    }
+
+    @Override
+    public UserGroupInformation getAuthorizedUgi(
+        String authzId, SecretManager<TokenIdentifier> secretManager)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git a/hbase-server/src/test/resources/log4j.properties b/hbase-server/src/test/resources/log4j.properties
index fcb6600..785371d 100644
--- a/hbase-server/src/test/resources/log4j.properties
+++ b/hbase-server/src/test/resources/log4j.properties
@@ -68,3 +68,4 @@ log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
 log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
 # Enable this to get detailed connection error/retry logging.
 # log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
+log4j.logger.org.apache.directory=WARN


Mime
View raw message