kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-3751; SASL/SCRAM implementation
Date Tue, 10 Jan 2017 13:05:16 GMT
KAFKA-3751; SASL/SCRAM implementation

Implementation of KIP-84: https://cwiki.apache.org/confluence/display/KAFKA/KIP-84%3A+Support+SASL+SCRAM+mechanisms

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2086 from rajinisivaram/KAFKA-3751


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/275c5e1d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/275c5e1d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/275c5e1d

Branch: refs/heads/trunk
Commit: 275c5e1df237808fe72b8d9933f826949d4b5781
Parents: 1fea1c3
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Tue Jan 10 08:05:07 2017 -0500
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jan 10 08:05:07 2017 -0500

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   5 +
 .../org/apache/kafka/clients/ClientUtils.java   |   3 +-
 .../kafka/common/network/ChannelBuilders.java   |  43 ++-
 .../common/network/SaslChannelBuilder.java      |   9 +-
 .../apache/kafka/common/security/JaasUtils.java |  12 +-
 .../security/authenticator/CredentialCache.java |  71 ++++
 .../authenticator/SaslServerAuthenticator.java  |  12 +-
 .../common/security/kerberos/KerberosLogin.java |   2 +-
 .../common/security/plain/PlainSaslServer.java  |   2 +-
 .../common/security/scram/ScramCredential.java  |  50 +++
 .../security/scram/ScramCredentialCallback.java |  32 ++
 .../security/scram/ScramCredentialUtils.java    |  86 +++++
 .../common/security/scram/ScramFormatter.java   | 169 +++++++++
 .../common/security/scram/ScramLoginModule.java |  67 ++++
 .../common/security/scram/ScramMechanism.java   |  79 +++++
 .../common/security/scram/ScramMessages.java    | 272 +++++++++++++++
 .../common/security/scram/ScramSaslClient.java  | 239 +++++++++++++
 .../security/scram/ScramSaslClientProvider.java |  39 +++
 .../common/security/scram/ScramSaslServer.java  | 212 +++++++++++
 .../security/scram/ScramSaslServerProvider.java |  39 +++
 .../scram/ScramServerCallbackHandler.java       |  60 ++++
 .../kafka/common/network/NioEchoServer.java     |  15 +-
 .../authenticator/SaslAuthenticatorTest.java    | 143 +++++++-
 .../security/authenticator/TestJaasConfig.java  |  32 +-
 .../scram/ScramCredentialUtilsTest.java         |  97 ++++++
 .../security/scram/ScramFormatterTest.java      |  94 +++++
 .../security/scram/ScramMessagesTest.java       | 348 +++++++++++++++++++
 .../src/main/scala/kafka/admin/AdminUtils.scala |   7 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  34 +-
 .../controller/ControllerChannelManager.scala   |   5 +-
 .../main/scala/kafka/network/SocketServer.scala |  13 +-
 .../kafka/security/CredentialProvider.scala     |  52 +++
 .../main/scala/kafka/server/ConfigHandler.scala |   6 +-
 .../main/scala/kafka/server/DynamicConfig.scala |  13 +
 .../main/scala/kafka/server/KafkaServer.scala   |  10 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   3 +-
 .../kafka/api/EndToEndAuthorizationTest.scala   |   2 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala |  49 +++
 .../unit/kafka/admin/ConfigCommandTest.scala    |  48 +++
 .../integration/KafkaServerTestHarness.scala    |   6 +-
 .../unit/kafka/network/SocketServerTest.scala   |  14 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |  31 ++
 42 files changed, 2462 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 62cd77a..9fcd329 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -77,6 +77,11 @@
         <allow pkg="org.apache.kafka.common.errors" />
         <allow pkg="org.apache.kafka.clients" />
       </subpackage>
+      <subpackage name="scram">
+        <allow pkg="javax.crypto" />
+        <allow pkg="javax.xml.bind" />
+        <allow pkg="org.apache.kafka.common.errors" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="protocol">

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index be663ca..b7ff715 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.LoginType;
-import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -86,7 +85,7 @@ public class ClientUtils {
         if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
             throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
         String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
-        return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
+        return ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, configs, clientSaslMechanism, true);
     }
 
     public static Collection<ApiVersionsResponse.ApiVersion> buildExpectedApiVersions(Collection<ApiKeys> apiKeys) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 2d6ba8a..5a1486c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Map;
@@ -27,22 +28,48 @@ public class ChannelBuilders {
 
     /**
      * @param securityProtocol the securityProtocol
-     * @param mode the mode, it must be non-null if `securityProtocol` is not `PLAINTEXT`;
-     *             it is ignored otherwise
      * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
-     * @param configs client/server configs
+     * @param configs client configs
      * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
      * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
      *             inter-broker connections with inter-broker protocol version < 0.10
      * @return the configured `ChannelBuilder`
      * @throws IllegalArgumentException if `mode` invariants described above is not maintained
      */
-    public static ChannelBuilder create(SecurityProtocol securityProtocol,
+    public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol,
+            LoginType loginType,
+            Map<String, ?> configs,
+            String clientSaslMechanism,
+            boolean saslHandshakeRequestEnable) {
+
+        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
+            if (loginType == null)
+                throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
+            if (clientSaslMechanism == null)
+                throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
+        }
+        return create(securityProtocol, Mode.CLIENT, loginType, configs, clientSaslMechanism, saslHandshakeRequestEnable, null);
+    }
+
+    /**
+     * @param securityProtocol the securityProtocol
+     * @param configs server configs
+     * @param credentialCache Credential cache for SASL/SCRAM if SCRAM is enabled
+     * @return the configured `ChannelBuilder`
+     */
+    public static ChannelBuilder serverChannelBuilder(SecurityProtocol securityProtocol,
+            Map<String, ?> configs,
+            CredentialCache credentialCache) {
+        return create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true, credentialCache);
+    }
+
+    private static ChannelBuilder create(SecurityProtocol securityProtocol,
                                         Mode mode,
                                         LoginType loginType,
                                         Map<String, ?> configs,
                                         String clientSaslMechanism,
-                                        boolean saslHandshakeRequestEnable) {
+                                        boolean saslHandshakeRequestEnable,
+                                        CredentialCache credentialCache) {
         ChannelBuilder channelBuilder;
 
         switch (securityProtocol) {
@@ -53,11 +80,7 @@ public class ChannelBuilders {
             case SASL_SSL:
             case SASL_PLAINTEXT:
                 requireNonNullMode(mode, securityProtocol);
-                if (loginType == null)
-                    throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
-                if (mode == Mode.CLIENT && clientSaslMechanism == null)
-                    throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
-                channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
+                channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
                 break;
             case PLAINTEXT:
             case TRACE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index ba2f7df..b556f38 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -23,6 +23,7 @@ import javax.security.auth.login.Configuration;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
@@ -40,6 +41,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private final Mode mode;
     private final LoginType loginType;
     private final boolean handshakeRequestEnable;
+    private final CredentialCache credentialCache;
 
     private Configuration jaasConfig;
     private LoginManager loginManager;
@@ -47,12 +49,14 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private Map<String, ?> configs;
     private KerberosShortNamer kerberosShortNamer;
 
-    public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol, String clientSaslMechanism, boolean handshakeRequestEnable) {
+    public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol,
+            String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache) {
         this.mode = mode;
         this.loginType = loginType;
         this.securityProtocol = securityProtocol;
         this.handshakeRequestEnable = handshakeRequestEnable;
         this.clientSaslMechanism = clientSaslMechanism;
+        this.credentialCache = credentialCache;
     }
 
     public void configure(Map<String, ?> configs) throws KafkaException {
@@ -98,7 +102,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             Authenticator authenticator;
             if (mode == Mode.SERVER)
                 authenticator = new SaslServerAuthenticator(id, jaasConfig, loginManager.subject(), kerberosShortNamer,
-                        socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
+                        socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize, credentialCache);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
                         socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
@@ -124,5 +128,4 @@ public class SaslChannelBuilder implements ChannelBuilder {
             return new PlaintextTransportLayer(key);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index aa328d4..10e591d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -83,17 +83,19 @@ public class JaasUtils {
 
     /**
      * Returns the configuration option for <code>key</code> from the server login context
-     * of the default JAAS configuration.
+     * of the default JAAS configuration. If login module name is specified, return option value
+     * only from that module.
      */
-    public static String defaultServerJaasConfigOption(String key) throws IOException {
-        return jaasConfigOption(Configuration.getConfiguration(), LoginType.SERVER.contextName(), key);
+    public static String defaultServerJaasConfigOption(String key, String loginModuleName) throws IOException {
+        return jaasConfigOption(Configuration.getConfiguration(), LoginType.SERVER.contextName(), key, loginModuleName);
     }
 
     /**
      * Returns the configuration option for <code>key</code> from the login context
      * <code>loginContextName</code> of the specified JAAS configuration.
+     * If login module name is specified, return option value only from that module.
      */
-    public static String jaasConfigOption(Configuration jaasConfig, String loginContextName, String key) throws IOException {
+    public static String jaasConfigOption(Configuration jaasConfig, String loginContextName, String key, String loginModuleName) throws IOException {
         AppConfigurationEntry[] configurationEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
         if (configurationEntries == null) {
             String errorMessage = "Could not find a '" + loginContextName + "' entry in this JAAS configuration.";
@@ -101,6 +103,8 @@ public class JaasUtils {
         }
 
         for (AppConfigurationEntry entry: configurationEntries) {
+            if (loginModuleName != null && !loginModuleName.equals(entry.getLoginModuleName()))
+                continue;
             Object val = entry.getOptions().get(key);
             if (val != null)
                 return (String) val;

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
new file mode 100644
index 0000000..568dcb5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CredentialCache {
+
+    private final Map<String, Cache<? extends Object>> cacheMap = new HashMap<>();
+
+    public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
+        Cache<C> cache = new Cache<C>(credentialClass);
+        cacheMap.put(mechanism, cache);
+        return cache;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <C> Cache<C> cache(String mechanism, Class<C> credentialClass) {
+        Cache<?> cache = cacheMap.get(mechanism);
+        if (cache != null) {
+            if (cache.credentialClass() != credentialClass)
+                throw new IllegalArgumentException("Invalid credential class " + credentialClass + ", expected " + cache.credentialClass());
+            return (Cache<C>) cache;
+        } else
+            return null;
+    }
+
+    public static class Cache<C> {
+        private final Class<C> credentialClass;
+        private final ConcurrentHashMap<String, C> credentials;
+
+        public Cache(Class<C> credentialClass) {
+            this.credentialClass = credentialClass;
+            this.credentials = new ConcurrentHashMap<>();
+        }
+
+        public C get(String username) {
+            return credentials.get(username);
+        }
+
+        public C put(String username, C credential) {
+            return credentials.put(username, credential);
+        }
+
+        public C remove(String username) {
+            return credentials.remove(username);
+        }
+
+        public Class<C> credentialClass() {
+            return credentialClass;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 27744ad..069e12f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -44,6 +44,9 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSException;
@@ -83,6 +86,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final KerberosShortNamer kerberosNamer;
     private final int maxReceiveSize;
     private final String host;
+    private final CredentialCache credentialCache;
 
     // Current SASL state
     private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
@@ -101,7 +105,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
 
-    public SaslServerAuthenticator(String node, Configuration jaasConfig, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
+    public SaslServerAuthenticator(String node, Configuration jaasConfig, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         this.node = node;
@@ -110,6 +114,7 @@ public class SaslServerAuthenticator implements Authenticator {
         this.kerberosNamer = kerberosNameParser;
         this.maxReceiveSize = maxReceiveSize;
         this.host = host;
+        this.credentialCache = credentialCache;
     }
 
     public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
@@ -123,7 +128,10 @@ public class SaslServerAuthenticator implements Authenticator {
 
     private void createSaslServer(String mechanism) throws IOException {
         this.saslMechanism = mechanism;
-        callbackHandler = new SaslServerCallbackHandler(jaasConfig, kerberosNamer);
+        if (!ScramMechanism.isScram(mechanism))
+            callbackHandler = new SaslServerCallbackHandler(jaasConfig, kerberosNamer);
+        else
+            callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
         callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
         if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
             if (subject.getPrincipals().isEmpty())

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index d8a040b..d2baf91 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -294,7 +294,7 @@ public class KerberosLogin extends AbstractLogin {
     private String getServiceName(Configuration jaasConfig, Map<String, ?> configs, String loginContext) {
         String jaasServiceName;
         try {
-            jaasServiceName = JaasUtils.jaasConfigOption(jaasConfig, loginContext, JaasUtils.SERVICE_NAME);
+            jaasServiceName = JaasUtils.jaasConfigOption(jaasConfig, loginContext, JaasUtils.SERVICE_NAME, null);
         } catch (IOException e) {
             throw new KafkaException("JAAS configuration entry not found", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
index ec7d696..0928057 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -92,7 +92,7 @@ public class PlainSaslServer implements SaslServer {
             authorizationID = username;
 
         try {
-            String expectedPassword = JaasUtils.defaultServerJaasConfigOption(JAAS_USER_PREFIX + username);
+            String expectedPassword = JaasUtils.defaultServerJaasConfigOption(JAAS_USER_PREFIX + username, PlainLoginModule.class.getName());
             if (!password.equals(expectedPassword)) {
                 throw new SaslException("Authentication failed: Invalid username or password");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredential.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredential.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredential.java
new file mode 100644
index 0000000..7de48f2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredential.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+public class ScramCredential {
+
+    private final byte[] salt;
+    private final byte[] serverKey;
+    private final byte[] storedKey;
+    private final int iterations;
+
+    public ScramCredential(byte[] salt, byte[] storedKey, byte[] serverKey, int iterations) {
+        this.salt = salt;
+        this.serverKey = serverKey;
+        this.storedKey = storedKey;
+        this.iterations = iterations;
+    }
+
+    public byte[] salt() {
+        return salt;
+    }
+
+    public byte[] serverKey() {
+        return serverKey;
+    }
+
+    public byte[] storedKey() {
+        return storedKey;
+    }
+
+    public int iterations() {
+        return iterations;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
new file mode 100644
index 0000000..07c1c93
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import javax.security.auth.callback.Callback;
+
+public class ScramCredentialCallback implements Callback {
+    private ScramCredential scramCredential;
+
+    public ScramCredential scramCredential() {
+        return scramCredential;
+    }
+
+    public void scramCredential(ScramCredential scramCredential) {
+        this.scramCredential = scramCredential;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
new file mode 100644
index 0000000..359e46d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.util.Collection;
+import java.util.Properties;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+
+/**
+ * SCRAM Credential persistence utility functions. Implements format conversion used
+ * for the credential store implemented in Kafka. Credentials are persisted as a comma-separated
+ * String of key-value pairs:
+ * <pre>
+ *   salt=<i>salt</i>,stored_key=<i>stored_key</i>,server_key=<i>server_key</i>,iterations=<i>iterations</i>
+ * </pre>
+ *
+ */
+public class ScramCredentialUtils {
+    private static final String SALT = "salt";
+    private static final String STORED_KEY = "stored_key";
+    private static final String SERVER_KEY = "server_key";
+    private static final String ITERATIONS = "iterations";
+
+    public static String credentialToString(ScramCredential credential) {
+        return String.format("%s=%s,%s=%s,%s=%s,%s=%d",
+               SALT,
+               DatatypeConverter.printBase64Binary(credential.salt()),
+               STORED_KEY,
+               DatatypeConverter.printBase64Binary(credential.storedKey()),
+               SERVER_KEY,
+               DatatypeConverter.printBase64Binary(credential.serverKey()),
+               ITERATIONS,
+               credential.iterations());
+    }
+
+    public static ScramCredential credentialFromString(String str) {
+        Properties props = toProps(str);
+        if (props.size() != 4 || !props.containsKey(SALT) || !props.containsKey(STORED_KEY) ||
+                !props.containsKey(SERVER_KEY) || !props.containsKey(ITERATIONS)) {
+            throw new IllegalArgumentException("Credentials not valid: " + str);
+        }
+        byte[] salt = DatatypeConverter.parseBase64Binary(props.getProperty(SALT));
+        byte[] storedKey = DatatypeConverter.parseBase64Binary(props.getProperty(STORED_KEY));
+        byte[] serverKey = DatatypeConverter.parseBase64Binary(props.getProperty(SERVER_KEY));
+        int iterations = Integer.parseInt(props.getProperty(ITERATIONS));
+        return new ScramCredential(salt, storedKey, serverKey, iterations);
+    }
+
+    private static Properties toProps(String str) {
+        Properties props = new Properties();
+        String[] tokens = str.split(",");
+        for (String token : tokens) {
+            int index = token.indexOf('=');
+            if (index <= 0)
+                throw new IllegalArgumentException("Credentials not valid: " + str);
+            props.put(token.substring(0, index), token.substring(index + 1));
+        }
+        return props;
+    }
+
+    public static void createCache(CredentialCache cache, Collection<String> enabledMechanisms) {
+        for (String mechanism : ScramMechanism.mechanismNames()) {
+            if (enabledMechanisms.contains(mechanism))
+                cache.createCache(mechanism, ScramCredential.class);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramFormatter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramFormatter.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramFormatter.java
new file mode 100644
index 0000000..e600573
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramFormatter.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+
+/**
+ * Scram message salt and hash functions defined in <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>.
+ */
+public class ScramFormatter {
+
+    private final MessageDigest messageDigest;
+    private final Mac mac;
+    private final SecureRandom random;
+
+    public ScramFormatter(ScramMechanism mechanism) throws NoSuchAlgorithmException {
+        this.messageDigest = MessageDigest.getInstance(mechanism.hashAlgorithm());
+        this.mac = Mac.getInstance(mechanism.macAlgorithm());
+        this.random = new SecureRandom();
+    }
+
+    public byte[] hmac(byte[] key, byte[] bytes) throws InvalidKeyException {
+        mac.init(new SecretKeySpec(key, mac.getAlgorithm()));
+        return mac.doFinal(bytes);
+    }
+
+    public byte[] hash(byte[] str) {
+        return messageDigest.digest(str);
+    }
+
+    public byte[] xor(byte[] first, byte[] second) {
+        if (first.length != second.length)
+            throw new IllegalArgumentException("Argument arrays must be of the same length");
+        byte[] result = new byte[first.length];
+        for (int i = 0; i < result.length; i++)
+            result[i] = (byte) (first[i] ^ second[i]);
+        return result;
+    }
+
+    public byte[] hi(byte[] str, byte[] salt, int iterations) throws InvalidKeyException {
+        mac.init(new SecretKeySpec(str, mac.getAlgorithm()));
+        mac.update(salt);
+        byte[] u1 = mac.doFinal(new byte[]{0, 0, 0, 1});
+        byte[] prev = u1;
+        byte[] result = u1;
+        for (int i = 2; i <= iterations; i++) {
+            byte[] ui = hmac(str, prev);
+            result = xor(result, ui);
+            prev = ui;
+        }
+        return result;
+    }
+
+    public byte[] normalize(String str) {
+        return toBytes(str);
+    }
+
+    public byte[] saltedPassword(String password, byte[] salt, int iterations) throws InvalidKeyException {
+        return hi(normalize(password), salt, iterations);
+    }
+
+    public byte[] clientKey(byte[] saltedPassword) throws InvalidKeyException {
+        return hmac(saltedPassword, toBytes("Client Key"));
+    }
+
+    public byte[] storedKey(byte[] clientKey) {
+        return hash(clientKey);
+    }
+
+    public String saslName(String username) {
+        return username.replace("=", "=3D").replace(",", "=2C");
+    }
+
+    public String username(String saslName) {
+        String username = saslName.replace("=2C", ",");
+        if (username.replace("=3D", "").indexOf('=') >= 0)
+            throw new IllegalArgumentException("Invalid username: " + saslName);
+        return username.replace("=3D", "=");
+    }
+
+    public String authMessage(String clientFirstMessageBare, String serverFirstMessage, String clientFinalMessageWithoutProof) {
+        return clientFirstMessageBare + "," + serverFirstMessage + "," + clientFinalMessageWithoutProof;
+    }
+
+    public byte[] clientSignature(byte[] storedKey, ClientFirstMessage clientFirstMessage, ServerFirstMessage serverFirstMessage, ClientFinalMessage clientFinalMessage) throws InvalidKeyException {
+        byte[] authMessage = authMessage(clientFirstMessage, serverFirstMessage, clientFinalMessage);
+        return hmac(storedKey, authMessage);
+    }
+
+    public byte[] clientProof(byte[] saltedPassword, ClientFirstMessage clientFirstMessage, ServerFirstMessage serverFirstMessage, ClientFinalMessage clientFinalMessage) throws InvalidKeyException {
+        byte[] clientKey = clientKey(saltedPassword);
+        byte[] storedKey = hash(clientKey);
+        byte[] clientSignature = hmac(storedKey, authMessage(clientFirstMessage, serverFirstMessage, clientFinalMessage));
+        return xor(clientKey, clientSignature);
+    }
+
+    private byte[] authMessage(ClientFirstMessage clientFirstMessage, ServerFirstMessage serverFirstMessage, ClientFinalMessage clientFinalMessage) {
+        return toBytes(authMessage(clientFirstMessage.clientFirstMessageBare(),
+                serverFirstMessage.toMessage(),
+                clientFinalMessage.clientFinalMessageWithoutProof()));
+    }
+
+    public byte[] storedKey(byte[] clientSignature, byte[] clientProof) {
+        return hash(xor(clientSignature, clientProof));
+    }
+
+    public byte[] serverKey(byte[] saltedPassword) throws InvalidKeyException {
+        return hmac(saltedPassword, toBytes("Server Key"));
+    }
+
+    public byte[] serverSignature(byte[] serverKey, ClientFirstMessage clientFirstMessage, ServerFirstMessage serverFirstMessage, ClientFinalMessage clientFinalMessage) throws InvalidKeyException {
+        byte[] authMessage = authMessage(clientFirstMessage, serverFirstMessage, clientFinalMessage);
+        return hmac(serverKey, authMessage);
+    }
+
+    public String secureRandomString() {
+        return new BigInteger(130, random).toString(Character.MAX_RADIX);
+    }
+
+    public byte[] secureRandomBytes() {
+        return toBytes(secureRandomString());
+    }
+
+    public byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    public ScramCredential generateCredential(String password, int iterations) {
+        try {
+            byte[] salt = secureRandomBytes();
+            byte[] saltedPassword = saltedPassword(password, salt, iterations);
+            byte[] clientKey = clientKey(saltedPassword);
+            byte[] storedKey = storedKey(clientKey);
+            byte[] serverKey = serverKey(saltedPassword);
+            return new ScramCredential(salt, storedKey, serverKey, iterations);
+        } catch (InvalidKeyException e) {
+            throw new KafkaException("Could not create credential", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
new file mode 100644
index 0000000..1e4b643
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+public class ScramLoginModule implements LoginModule {
+
+    private static final String USERNAME_CONFIG = "username";
+    private static final String PASSWORD_CONFIG = "password";
+
+    static {
+        ScramSaslClientProvider.initialize();
+        ScramSaslServerProvider.initialize();
+    }
+
+    @Override
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
+        String username = (String) options.get(USERNAME_CONFIG);
+        if (username != null)
+            subject.getPublicCredentials().add(username);
+        String password = (String) options.get(PASSWORD_CONFIG);
+        if (password != null)
+            subject.getPrivateCredentials().add(password);
+    }
+
+    @Override
+    public boolean login() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean logout() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean commit() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean abort() throws LoginException {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMechanism.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMechanism.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMechanism.java
new file mode 100644
index 0000000..86b51bc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMechanism.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum ScramMechanism {
+
+    SCRAM_SHA_256("SHA-256", "HmacSHA256", 4096),
+    SCRAM_SHA_512("SHA-512", "HmacSHA512", 4096);
+
+    private final String mechanismName;
+    private final String hashAlgorithm;
+    private final String macAlgorithm;
+    private final int minIterations;
+
+    private static final Map<String, ScramMechanism> MECHANISMS_MAP;
+
+    static {
+        Map<String, ScramMechanism> map = new HashMap<>();
+        for (ScramMechanism mech : values())
+            map.put(mech.mechanismName, mech);
+        MECHANISMS_MAP = Collections.unmodifiableMap(map);
+    }
+
+    ScramMechanism(String hashAlgorithm, String macAlgorithm, int minIterations) {
+        this.mechanismName = "SCRAM-" + hashAlgorithm;
+        this.hashAlgorithm = hashAlgorithm;
+        this.macAlgorithm = macAlgorithm;
+        this.minIterations = minIterations;
+    }
+
+    public final String mechanismName() {
+        return mechanismName;
+    }
+
+    public String hashAlgorithm() {
+        return hashAlgorithm;
+    }
+
+    public String macAlgorithm() {
+        return macAlgorithm;
+    }
+
+    public int minIterations() {
+        return minIterations;
+    }
+
+    public static ScramMechanism forMechanismName(String mechanismName) {
+        return MECHANISMS_MAP.get(mechanismName);
+    }
+
+    public static Collection<String> mechanismNames() {
+        return MECHANISMS_MAP.keySet();
+    }
+
+    public static boolean isScram(String mechanismName) {
+        return MECHANISMS_MAP.containsKey(mechanismName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
new file mode 100644
index 0000000..3f94e65
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.security.sasl.SaslException;
+import javax.xml.bind.DatatypeConverter;
+
+/**
+ * SCRAM request/response message creation and parsing based on
+ * <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>
+ *
+ */
+public class ScramMessages {
+
+    static abstract class AbstractScramMessage {
+
+        static final String ALPHA = "[A-Za-z]+";
+        static final String VALUE_SAFE = "[\\x01-\\x7F&&[^=,]]+";
+        static final String VALUE = "[\\x01-\\x7F&&[^,]]+";
+        static final String PRINTABLE = "[\\x21-\\x7E&&[^,]]+";
+        static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
+        static final String BASE64_CHAR = "[a-zA-Z0-9/+]";
+        static final String BASE64 = String.format("(?:%s{4})*(?:%s{3}=|%s{2}==)?", BASE64_CHAR, BASE64_CHAR, BASE64_CHAR);
+        static final String RESERVED = String.format("(m=%s,)?", VALUE);
+        static final String EXTENSIONS = String.format("(,%s=%s)*", ALPHA, VALUE);
+
+        abstract String toMessage();
+
+        public byte[] toBytes() {
+            return toMessage().getBytes(StandardCharsets.UTF_8);
+        }
+
+        protected String toMessage(byte[] messageBytes) {
+            return new String(messageBytes, StandardCharsets.UTF_8);
+        }
+    }
+
+    /**
+     * Format:
+     *     gs2-header [reserved-mext ","] username "," nonce ["," extensions]
+     * Limitations:
+     *     Only gs2-header "n" is supported.
+     *     Extensions are ignored.
+     *
+     */
+    public static class ClientFirstMessage extends AbstractScramMessage {
+        private static final Pattern PATTERN = Pattern.compile(String.format(
+                "n,(a=(?<authzid>%s))?,%sn=(?<saslname>%s),r=(?<nonce>%s)%s",
+                SASLNAME,
+                RESERVED,
+                SASLNAME,
+                PRINTABLE,
+                EXTENSIONS));
+
+        private final String saslName;
+        private final String nonce;
+        private final String authorizationId;
+        public ClientFirstMessage(byte[] messageBytes) throws SaslException {
+            String message = toMessage(messageBytes);
+            Matcher matcher = PATTERN.matcher(message);
+            if (!matcher.matches())
+                throw new SaslException("Invalid SCRAM client first message format: " + message);
+            String authzid = matcher.group("authzid");
+            this.authorizationId = authzid != null ? authzid : "";
+            this.saslName = matcher.group("saslname");
+            this.nonce = matcher.group("nonce");
+        }
+        public ClientFirstMessage(String saslName, String nonce) {
+            this.saslName = saslName;
+            this.nonce = nonce;
+            this.authorizationId = ""; // Optional authzid not specified in gs2-header
+        }
+        public String saslName() {
+            return saslName;
+        }
+        public String nonce() {
+            return nonce;
+        }
+        public String authorizationId() {
+            return authorizationId;
+        }
+        public String gs2Header() {
+            return "n," + authorizationId + ",";
+        }
+        public String clientFirstMessageBare() {
+            return String.format("n=%s,r=%s", saslName, nonce);
+        }
+        String toMessage() {
+            return gs2Header() + clientFirstMessageBare();
+        }
+    }
+
+    /**
+     * Format:
+     *     [reserved-mext ","] nonce "," salt "," iteration-count ["," extensions]
+     * Limitations:
+     *     Extensions are ignored.
+     *
+     */
+    public static class ServerFirstMessage extends AbstractScramMessage {
+        private static final Pattern PATTERN = Pattern.compile(String.format(
+                "%sr=(?<nonce>%s),s=(?<salt>%s),i=(?<iterations>[0-9]+)%s",
+                RESERVED,
+                PRINTABLE,
+                BASE64,
+                EXTENSIONS));
+
+        private final String nonce;
+        private final byte[] salt;
+        private final int iterations;
+        public ServerFirstMessage(byte[] messageBytes) throws SaslException {
+            String message = toMessage(messageBytes);
+            Matcher matcher = PATTERN.matcher(message);
+            if (!matcher.matches())
+                throw new SaslException("Invalid SCRAM server first message format: " + message);
+            try {
+                this.iterations = Integer.parseInt(matcher.group("iterations"));
+                if (this.iterations <= 0)
+                    throw new SaslException("Invalid SCRAM server first message format: invalid iterations " + iterations);
+            } catch (NumberFormatException e) {
+                throw new SaslException("Invalid SCRAM server first message format: invalid iterations");
+            }
+            this.nonce = matcher.group("nonce");
+            String salt = matcher.group("salt");
+            this.salt = DatatypeConverter.parseBase64Binary(salt);
+        }
+        public ServerFirstMessage(String clientNonce, String serverNonce, byte[] salt, int iterations) {
+            this.nonce = clientNonce + serverNonce;
+            this.salt = salt;
+            this.iterations = iterations;
+        }
+        public String nonce() {
+            return nonce;
+        }
+        public byte[] salt() {
+            return salt;
+        }
+        public int iterations() {
+            return iterations;
+        }
+        String toMessage() {
+            return String.format("r=%s,s=%s,i=%d", nonce, DatatypeConverter.printBase64Binary(salt), iterations);
+        }
+    }
+    /**
+     * Format:
+     *     channel-binding "," nonce ["," extensions]"," proof
+     * Limitations:
+     *     Extensions are ignored.
+     *
+     */
+    public static class ClientFinalMessage extends AbstractScramMessage {
+        private static final Pattern PATTERN = Pattern.compile(String.format(
+                "c=(?<channel>%s),r=(?<nonce>%s)%s,p=(?<proof>%s)",
+                BASE64,
+                PRINTABLE,
+                EXTENSIONS,
+                BASE64));
+
+        private final byte[] channelBinding;
+        private final String nonce;
+        private byte[] proof;
+        public ClientFinalMessage(byte[] messageBytes) throws SaslException {
+            String message = toMessage(messageBytes);
+            Matcher matcher = PATTERN.matcher(message);
+            if (!matcher.matches())
+                throw new SaslException("Invalid SCRAM client final message format: " + message);
+
+            this.channelBinding = DatatypeConverter.parseBase64Binary(matcher.group("channel"));
+            this.nonce = matcher.group("nonce");
+            this.proof = DatatypeConverter.parseBase64Binary(matcher.group("proof"));
+        }
+        public ClientFinalMessage(byte[] channelBinding, String nonce) {
+            this.channelBinding = channelBinding;
+            this.nonce = nonce;
+        }
+        public byte[] channelBinding() {
+            return channelBinding;
+        }
+        public String nonce() {
+            return nonce;
+        }
+        public byte[] proof() {
+            return proof;
+        }
+        public void proof(byte[] proof) {
+            this.proof = proof;
+        }
+        public String clientFinalMessageWithoutProof() {
+            return String.format("c=%s,r=%s",
+                    DatatypeConverter.printBase64Binary(channelBinding),
+                    nonce);
+        }
+        String toMessage() {
+            return String.format("%s,p=%s",
+                    clientFinalMessageWithoutProof(),
+                    DatatypeConverter.printBase64Binary(proof));
+        }
+    }
+    /**
+     * Format:
+     *     ("e=" server-error-value | "v=" base64_server_signature) ["," extensions]
+     * Limitations:
+     *     Extensions are ignored.
+     *
+     */
+    public static class ServerFinalMessage extends AbstractScramMessage {
+        private static final Pattern PATTERN = Pattern.compile(String.format(
+                "(?:e=(?<error>%s))|(?:v=(?<signature>%s))%s",
+                VALUE_SAFE,
+                BASE64,
+                EXTENSIONS));
+
+        private final String error;
+        private final byte[] serverSignature;
+        public ServerFinalMessage(byte[] messageBytes) throws SaslException {
+            String message = toMessage(messageBytes);
+            Matcher matcher = PATTERN.matcher(message);
+            if (!matcher.matches())
+                throw new SaslException("Invalid SCRAM server final message format: " + message);
+            String error = null;
+            try {
+                error = matcher.group("error");
+            } catch (IllegalArgumentException e) {
+                // ignore
+            }
+            if (error == null) {
+                this.serverSignature = DatatypeConverter.parseBase64Binary(matcher.group("signature"));
+                this.error = null;
+            } else {
+                this.serverSignature = null;
+                this.error = error;
+            }
+        }
+        public ServerFinalMessage(String error, byte[] serverSignature) {
+            this.error = error;
+            this.serverSignature = serverSignature;
+        }
+        public String error() {
+            return error;
+        }
+        public byte[] serverSignature() {
+            return serverSignature;
+        }
+        String toMessage() {
+            if (error != null)
+                return "e=" + error;
+            else
+                return "v=" + DatatypeConverter.printBase64Binary(serverSignature);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
new file mode 100644
index 0000000..4220a69
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SaslClient implementation for SASL/SCRAM.
+ * <p>
+ * This implementation expects a login module that populates username as
+ * the Subject's public credential and password as the private credential.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>
+ *
+ */
+public class ScramSaslClient implements SaslClient {
+
+    private static final Logger log = LoggerFactory.getLogger(ScramSaslClient.class);
+
+    enum State {
+        SEND_CLIENT_FIRST_MESSAGE,
+        RECEIVE_SERVER_FIRST_MESSAGE,
+        RECEIVE_SERVER_FINAL_MESSAGE,
+        COMPLETE,
+        FAILED
+    };
+
+    private final ScramMechanism mechanism;
+    private final CallbackHandler callbackHandler;
+    private final ScramFormatter formatter;
+    private String clientNonce;
+    private State state;
+    private byte[] saltedPassword;
+    private ScramMessages.ClientFirstMessage clientFirstMessage;
+    private ScramMessages.ServerFirstMessage serverFirstMessage;
+    private ScramMessages.ClientFinalMessage clientFinalMessage;
+
+    public ScramSaslClient(ScramMechanism mechanism, CallbackHandler cbh) throws NoSuchAlgorithmException {
+        this.mechanism = mechanism;
+        this.callbackHandler = cbh;
+        this.formatter = new ScramFormatter(mechanism);
+        setState(State.SEND_CLIENT_FIRST_MESSAGE);
+    }
+
+    @Override
+    public String getMechanismName() {
+        return mechanism.mechanismName();
+    }
+
+    @Override
+    public boolean hasInitialResponse() {
+        return true;
+    }
+
+    @Override
+    public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+        try {
+            switch (state) {
+                case SEND_CLIENT_FIRST_MESSAGE:
+                    if (challenge != null && challenge.length != 0)
+                        throw new SaslException("Expected empty challenge");
+                    clientNonce = formatter.secureRandomString();
+                    NameCallback nameCallback = new NameCallback("Name:");
+                    try {
+                        callbackHandler.handle(new Callback[]{nameCallback});
+                    } catch (IOException | UnsupportedCallbackException e) {
+                        throw new SaslException("User name could not be obtained", e);
+                    }
+                    String username = nameCallback.getName();
+                    String saslName = formatter.saslName(username);
+                    this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce);
+                    setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
+                    return clientFirstMessage.toBytes();
+
+                case RECEIVE_SERVER_FIRST_MESSAGE:
+                    this.serverFirstMessage = new ServerFirstMessage(challenge);
+                    if (!serverFirstMessage.nonce().startsWith(clientNonce))
+                        throw new SaslException("Invalid server nonce: does not start with client nonce");
+                    if (serverFirstMessage.iterations() < mechanism.minIterations())
+                        throw new SaslException("Requested iterations " + serverFirstMessage.iterations() +  " is less than the minimum " + mechanism.minIterations() + " for " + mechanism);
+                    PasswordCallback passwordCallback = new PasswordCallback("Password:", false);
+                    try {
+                        callbackHandler.handle(new Callback[]{passwordCallback});
+                    } catch (IOException | UnsupportedCallbackException e) {
+                        throw new SaslException("User name could not be obtained", e);
+                    }
+                    this.clientFinalMessage = handleServerFirstMessage(passwordCallback.getPassword());
+                    setState(State.RECEIVE_SERVER_FINAL_MESSAGE);
+                    return clientFinalMessage.toBytes();
+
+                case RECEIVE_SERVER_FINAL_MESSAGE:
+                    ServerFinalMessage serverFinalMessage = new ServerFinalMessage(challenge);
+                    if (serverFinalMessage.error() != null)
+                        throw new SaslException("Sasl authentication using " + mechanism + " failed with error: " + serverFinalMessage.error());
+                    handleServerFinalMessage(serverFinalMessage.serverSignature());
+                    setState(State.COMPLETE);
+                    return null;
+
+                default:
+                    throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state);
+            }
+        } catch (SaslException e) {
+            setState(State.FAILED);
+            throw e;
+        }
+    }
+
+    @Override
+    public boolean isComplete() {
+        return state == State.COMPLETE;
+    }
+
+    @Override
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(incoming, offset, offset + len);
+    }
+
+    @Override
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(outgoing, offset, offset + len);
+    }
+
+    @Override
+    public Object getNegotiatedProperty(String propName) {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return null;
+    }
+
+    @Override
+    public void dispose() throws SaslException {
+    }
+
+    private void setState(State state) {
+        log.debug("Setting SASL/{} client state to {}", mechanism, state);
+        this.state = state;
+    }
+
+    private ClientFinalMessage handleServerFirstMessage(char[] password) throws SaslException {
+        try {
+            byte[] passwordBytes = formatter.normalize(new String(password));
+            this.saltedPassword = formatter.hi(passwordBytes, serverFirstMessage.salt(), serverFirstMessage.iterations());
+
+            ClientFinalMessage clientFinalMessage = new ClientFinalMessage("n,,".getBytes(StandardCharsets.UTF_8), serverFirstMessage.nonce());
+            byte[] clientProof = formatter.clientProof(saltedPassword, clientFirstMessage, serverFirstMessage, clientFinalMessage);
+            clientFinalMessage.proof(clientProof);
+            return clientFinalMessage;
+        } catch (InvalidKeyException e) {
+            throw new SaslException("Client final message could not be created", e);
+        }
+    }
+
+    private void handleServerFinalMessage(byte[] signature) throws SaslException {
+        try {
+            byte[] serverKey = formatter.serverKey(saltedPassword);
+            byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage);
+            if (!Arrays.equals(signature, serverSignature))
+                throw new SaslException("Invalid server signature in server final message");
+        } catch (InvalidKeyException e) {
+            throw new SaslException("Sasl server signature verification failed", e);
+        }
+    }
+
+    public static class ScramSaslClientFactory implements SaslClientFactory {
+
+        @Override
+        public SaslClient createSaslClient(String[] mechanisms,
+                String authorizationId,
+                String protocol,
+                String serverName,
+                Map<String, ?> props,
+                CallbackHandler cbh) throws SaslException {
+
+            ScramMechanism mechanism = null;
+            for (String mech : mechanisms) {
+                mechanism = ScramMechanism.forMechanismName(mech);
+                if (mechanism != null)
+                    break;
+            }
+            if (mechanism == null)
+                throw new SaslException(String.format("Requested mechanisms '%s' not supported. Supported mechanisms are '%s'.",
+                        Arrays.asList(mechanisms), ScramMechanism.mechanismNames()));
+
+            try {
+                return new ScramSaslClient(mechanism, cbh);
+            } catch (NoSuchAlgorithmException e) {
+                throw new SaslException("Hash algorithm not supported for mechanism " + mechanism, e);
+            }
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            Collection<String> mechanisms = ScramMechanism.mechanismNames();
+            return mechanisms.toArray(new String[mechanisms.size()]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
new file mode 100644
index 0000000..aa5fc56
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.security.Provider;
+import java.security.Security;
+
+import org.apache.kafka.common.security.scram.ScramSaslClient.ScramSaslClientFactory;
+
+public class ScramSaslClientProvider extends Provider {
+
+    private static final long serialVersionUID = 1L;
+
+    protected ScramSaslClientProvider() {
+        super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider for Kafka");
+        for (ScramMechanism mechanism : ScramMechanism.values())
+            super.put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName());
+    }
+
+    public static void initialize() {
+        Security.addProvider(new ScramSaslClientProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
new file mode 100644
index 0000000..4298f98
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.io.IOException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SaslServer implementation for SASL/SCRAM. This server is configured with a callback
+ * handler for integration with a credential manager. Kafka brokers provide callbacks
+ * based on a Zookeeper-based password store.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>
+ */
+public class ScramSaslServer implements SaslServer {
+
+    private static final Logger log = LoggerFactory.getLogger(ScramSaslServer.class);
+
+    enum State {
+        RECEIVE_CLIENT_FIRST_MESSAGE,
+        RECEIVE_CLIENT_FINAL_MESSAGE,
+        COMPLETE,
+        FAILED
+    };
+
+    private final ScramMechanism mechanism;
+    private final ScramFormatter formatter;
+    private final CallbackHandler callbackHandler;
+    private State state;
+    private String username;
+    private ClientFirstMessage clientFirstMessage;
+    private ServerFirstMessage serverFirstMessage;
+    private String serverNonce;
+    private ScramCredential scramCredential;
+
+    public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
+        this.mechanism = mechanism;
+        this.formatter = new ScramFormatter(mechanism);
+        this.callbackHandler = callbackHandler;
+        setState(State.RECEIVE_CLIENT_FIRST_MESSAGE);
+    }
+
+    @Override
+    public byte[] evaluateResponse(byte[] response) throws SaslException {
+        try {
+            switch (state) {
+                case RECEIVE_CLIENT_FIRST_MESSAGE:
+                    this.clientFirstMessage = new ClientFirstMessage(response);
+                    serverNonce = formatter.secureRandomString();
+                    try {
+                        String saslName = clientFirstMessage.saslName();
+                        this.username = formatter.username(saslName);
+                        NameCallback nameCallback = new NameCallback("username", username);
+                        ScramCredentialCallback credentialCallback = new ScramCredentialCallback();
+                        callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
+                        this.scramCredential = credentialCallback.scramCredential();
+                        if (scramCredential == null)
+                            throw new SaslException("Authentication failed: Invalid user credentials");
+                        if (scramCredential.iterations() < mechanism.minIterations())
+                            throw new SaslException("Iterations " + scramCredential.iterations() +  " is less than the minimum " + mechanism.minIterations() + " for " + mechanism);
+                        this.serverFirstMessage = new ServerFirstMessage(clientFirstMessage.nonce(),
+                                serverNonce,
+                                scramCredential.salt(),
+                                scramCredential.iterations());
+                        setState(State.RECEIVE_CLIENT_FINAL_MESSAGE);
+                        return serverFirstMessage.toBytes();
+                    } catch (IOException | NumberFormatException | UnsupportedCallbackException e) {
+                        throw new SaslException("Authentication failed: Credentials could not be obtained", e);
+                    }
+
+                case RECEIVE_CLIENT_FINAL_MESSAGE:
+                    try {
+                        ClientFinalMessage clientFinalMessage = new ClientFinalMessage(response);
+                        verifyClientProof(clientFinalMessage);
+                        byte[] serverKey = scramCredential.serverKey();
+                        byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage);
+                        ServerFinalMessage serverFinalMessage = new ServerFinalMessage(null, serverSignature);
+                        setState(State.COMPLETE);
+                        return serverFinalMessage.toBytes();
+                    } catch (InvalidKeyException e) {
+                        throw new SaslException("Authentication failed: Invalid client final message", e);
+                    }
+
+                default:
+                    throw new IllegalSaslStateException("Unexpected challenge in Sasl server state " + state);
+            }
+        } catch (SaslException e) {
+            setState(State.FAILED);
+            throw e;
+        }
+    }
+
+    @Override
+    public String getAuthorizationID() {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        String authzId = clientFirstMessage.authorizationId();
+        return authzId == null || authzId.length() == 0 ? username : authzId;
+    }
+
+    @Override
+    public String getMechanismName() {
+        return mechanism.mechanismName();
+    }
+
+    @Override
+    public Object getNegotiatedProperty(String propName) {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return null;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return state == State.COMPLETE;
+    }
+
+    @Override
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(incoming, offset, offset + len);
+    }
+
+    @Override
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(outgoing, offset, offset + len);
+    }
+
+    @Override
+    public void dispose() throws SaslException {
+    }
+
+    private void setState(State state) {
+        log.debug("Setting SASL/{} server state to {}", mechanism, state);
+        this.state = state;
+    }
+
+    private void verifyClientProof(ClientFinalMessage clientFinalMessage) throws SaslException {
+        try {
+            byte[] expectedStoredKey = scramCredential.storedKey();
+            byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage);
+            byte[] computedStoredKey = formatter.storedKey(clientSignature, clientFinalMessage.proof());
+            if (!Arrays.equals(computedStoredKey, expectedStoredKey))
+                throw new SaslException("Invalid client credentials");
+        } catch (InvalidKeyException e) {
+            throw new SaslException("Sasl client verification failed", e);
+        }
+    }
+
+    public static class ScramSaslServerFactory implements SaslServerFactory {
+
+        @Override
+        public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh)
+            throws SaslException {
+
+            if (!ScramMechanism.isScram(mechanism)) {
+                throw new SaslException(String.format("Requested mechanism '%s' is not supported. Supported mechanisms are '%s'.",
+                        mechanism, ScramMechanism.mechanismNames()));
+            }
+            try {
+                return new ScramSaslServer(ScramMechanism.forMechanismName(mechanism), props, cbh);
+            } catch (NoSuchAlgorithmException e) {
+                throw new SaslException("Hash algorithm not supported for mechanism " + mechanism, e);
+            }
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            Collection<String> mechanisms = ScramMechanism.mechanismNames();
+            return mechanisms.toArray(new String[mechanisms.size()]);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
new file mode 100644
index 0000000..496cb5f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.security.Provider;
+import java.security.Security;
+
+import org.apache.kafka.common.security.scram.ScramSaslServer.ScramSaslServerFactory;
+
+public class ScramSaslServerProvider extends Provider {
+
+    private static final long serialVersionUID = 1L;
+
+    protected ScramSaslServerProvider() {
+        super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka");
+        for (ScramMechanism mechanism : ScramMechanism.values())
+            super.put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName());
+    }
+
+    public static void initialize() {
+        Security.addProvider(new ScramSaslServerProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
new file mode 100644
index 0000000..46bfe57
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+
+public class ScramServerCallbackHandler implements AuthCallbackHandler {
+
+    private final CredentialCache.Cache<ScramCredential> credentialCache;
+
+    public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache) {
+        this.credentialCache = credentialCache;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        String username = null;
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback)
+                username = ((NameCallback) callback).getDefaultName();
+            else if (callback instanceof ScramCredentialCallback)
+                ((ScramCredentialCallback) callback).scramCredential(credentialCache.get(username));
+            else
+                throw new UnsupportedCallbackException(callback);
+        }
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) {
+    }
+
+    @Override
+    public void close() {
+    }
+}
\ No newline at end of file


Mime
View raw message