kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-3751; SASL/SCRAM implementation
Date Tue, 10 Jan 2017 13:05:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1fea1c390 -> 275c5e1df


http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index cc4befa..0bff0ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -27,6 +27,9 @@ import java.util.Map;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.utils.MockTime;
 
 /**
@@ -42,6 +45,7 @@ public class NioEchoServer extends Thread {
     private final AcceptorThread acceptorThread;
     private final Selector selector;
     private volatile WritableByteChannel outputChannel;
+    private final CredentialCache credentialCache;
 
     public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
         serverSocketChannel = ServerSocketChannel.open();
@@ -50,7 +54,10 @@ public class NioEchoServer extends Thread {
         this.port = serverSocketChannel.socket().getLocalPort();
         this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
         this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
-        ChannelBuilder channelBuilder = ChannelBuilders.create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true);
+        this.credentialCache = new CredentialCache();
+        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
+            ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
+        ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(securityProtocol, configs, credentialCache);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
         setName("echoserver");
         setDaemon(true);
@@ -61,6 +68,10 @@ public class NioEchoServer extends Thread {
         return port;
     }
 
+    public CredentialCache credentialCache() {
+        return credentialCache;
+    }
+
     @Override
     public void run() {
         try {
@@ -162,4 +173,4 @@ public class NioEchoServer extends Thread {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index f75d5b7..f8b57f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -19,7 +19,6 @@ import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.LoginType;
-import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
@@ -40,6 +39,10 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramFormatter;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
+import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,6 +50,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -209,8 +214,9 @@ public class SaslAuthenticatorTest {
     @Test
     public void testMultipleServerMechanisms() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
-        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN"));
+        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
         server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
 
         String node1 = "1";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
@@ -222,6 +228,125 @@ public class SaslAuthenticatorTest {
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
         selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
         NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
+        selector.close();
+
+        String node3 = "3";
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
+        createSelector(securityProtocol, saslClientConfigs);
+        selector.connect(node3, new InetSocketAddress("127.0.0.1", server.port()), BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
+        selector.close();
+        selector = null;
+    }
+
+    /**
+     * Tests good path SASL/SCRAM-SHA-256 client and server channels.
+     */
+    @Test
+    public void testValidSaslScramSha256() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+        createAndCheckClientConnection(securityProtocol, "0");
+    }
+
+    /**
+     * Tests all supported SCRAM client and server channels. Also tests that all
+     * supported SCRAM mechanisms can be supported simultaneously on a server.
+     */
+    @Test
+    public void testValidSaslScramMechanisms() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+
+        for (String mechanism : ScramMechanism.mechanismNames()) {
+            saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, mechanism);
+            createAndCheckClientConnection(securityProtocol, "node-" + mechanism);
+        }
+    }
+
+    /**
+     * Tests that SASL/SCRAM clients fail authentication if password is invalid.
+     */
+    @Test
+    public void testInvalidPasswordSaslScram() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+        Map<String, Object> options = new HashMap<>();
+        options.put("username", TestJaasConfig.USERNAME);
+        options.put("password", "invalidpassword");
+        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+        String node = "0";
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    /**
+     * Tests that SASL/SCRAM clients without valid username fail authentication.
+     */
+    @Test
+    public void testUnknownUserSaslScram() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+        Map<String, Object> options = new HashMap<>();
+        options.put("username", "unknownUser");
+        options.put("password", TestJaasConfig.PASSWORD);
+        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+        String node = "0";
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    /**
+     * Tests that SASL/SCRAM clients fail authentication if credentials are not available for
+     * the specific SCRAM mechanism.
+     */
+    @Test
+    public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
+
+        server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
+        String node = "1";
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+        selector.close();
+
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        createAndCheckClientConnection(securityProtocol, "2");
+    }
+
+    /**
+     * Tests SASL/SCRAM with username containing characters that need
+     * to be encoded.
+     */
+    @Test
+    public void testScramUsernameWithSpecialCharacters() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        String username = "special user= test,scram";
+        String password = username + "-password";
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+        Map<String, Object> options = new HashMap<>();
+        options.put("username", username);
+        options.put("password", password);
+        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        updateScramCredentialCache(username, password);
+        createAndCheckClientConnection(securityProtocol, "0");
     }
 
     /**
@@ -606,7 +731,7 @@ public class SaslAuthenticatorTest {
 
     private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
         String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
-        this.channelBuilder = ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, clientConfigs, saslMechanism, true);
+        this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, clientConfigs, saslMechanism, true);
         this.selector = NetworkTestUtils.createSelector(channelBuilder);
     }
 
@@ -660,4 +785,16 @@ public class SaslAuthenticatorTest {
         assertEquals(1, selector.completedReceives().size());
         return selector.completedReceives().get(0).payload();
     }
+
+    @SuppressWarnings("unchecked")
+    private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException {
+        for (String mechanism : (List<String>) saslServerConfigs.get(SaslConfigs.SASL_ENABLED_MECHANISMS)) {
+            ScramMechanism scramMechanism = ScramMechanism.forMechanismName(mechanism);
+            if (scramMechanism != null) {
+                ScramFormatter formatter = new ScramFormatter(scramMechanism);
+                ScramCredential credential = formatter.generateCredential(password, 4096);
+                server.credentialCache().cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, credential);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index 22a3267..a27b87a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -23,6 +24,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
+import org.apache.kafka.common.security.scram.ScramMechanism;
 
 public class TestJaasConfig extends Configuration {
 
@@ -35,7 +38,7 @@ public class TestJaasConfig extends Configuration {
         TestJaasConfig config = new TestJaasConfig();
         config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
         for (String mechanism : serverMechanisms) {
-            config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions());
+            config.addEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
         }
         Configuration.setConfiguration(config);
         return config;
@@ -59,6 +62,14 @@ public class TestJaasConfig extends Configuration {
         entryMap.put(name, new AppConfigurationEntry[] {entry});
     }
 
+    public void addEntry(String name, String loginModule, Map<String, Object> options) {
+        AppConfigurationEntry entry = new AppConfigurationEntry(loginModule, LoginModuleControlFlag.REQUIRED, options);
+        AppConfigurationEntry[] existing = entryMap.get(name);
+        AppConfigurationEntry[] newEntries = existing == null ? new AppConfigurationEntry[1] : Arrays.copyOf(existing, existing.length + 1);
+        newEntries[newEntries.length - 1] = entry;
+        entryMap.put(name, newEntries);
+    }
+
     @Override
     public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
         return entryMap.get(name);
@@ -74,7 +85,10 @@ public class TestJaasConfig extends Configuration {
                 loginModule = TestDigestLoginModule.class.getName();
                 break;
             default:
-                throw new IllegalArgumentException("Unsupported mechanism " + mechanism);
+                if (ScramMechanism.isScram(mechanism))
+                    loginModule = ScramLoginModule.class.getName();
+                else
+                    throw new IllegalArgumentException("Unsupported mechanism " + mechanism);
         }
         return loginModule;
     }
@@ -86,9 +100,17 @@ public class TestJaasConfig extends Configuration {
         return options;
     }
 
-    public static Map<String, Object> defaultServerOptions() {
+    public static Map<String, Object> defaultServerOptions(String mechanism) {
         Map<String, Object> options = new HashMap<>();
-        options.put("user_" + USERNAME, PASSWORD);
+        switch (mechanism) {
+            case "PLAIN":
+            case "DIGEST-MD5":
+                options.put("user_" + USERNAME, PASSWORD);
+                break;
+            default:
+                if (!ScramMechanism.isScram(mechanism))
+                    throw new IllegalArgumentException("Unsupported mechanism " + mechanism);
+        }
         return options;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/security/scram/ScramCredentialUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramCredentialUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramCredentialUtilsTest.java
new file mode 100644
index 0000000..9acc16e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramCredentialUtilsTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.junit.Before;
+
+public class ScramCredentialUtilsTest {
+
+    private ScramFormatter formatter;
+
+    @Before
+    public void setUp() throws NoSuchAlgorithmException {
+        formatter = new ScramFormatter(ScramMechanism.SCRAM_SHA_256);
+    }
+
+    @Test
+    public void stringConversion() {
+        ScramCredential credential = formatter.generateCredential("password", 1024);
+        assertTrue("Salt must not be empty", credential.salt().length > 0);
+        assertTrue("Stored key must not be empty", credential.storedKey().length > 0);
+        assertTrue("Server key must not be empty", credential.serverKey().length > 0);
+        ScramCredential credential2 = ScramCredentialUtils.credentialFromString(ScramCredentialUtils.credentialToString(credential));
+        assertArrayEquals(credential.salt(), credential2.salt());
+        assertArrayEquals(credential.storedKey(), credential2.storedKey());
+        assertArrayEquals(credential.serverKey(), credential2.serverKey());
+        assertEquals(credential.iterations(), credential2.iterations());
+    }
+
+    @Test
+    public void generateCredential() {
+        ScramCredential credential1 = formatter.generateCredential("password", 4096);
+        ScramCredential credential2 = formatter.generateCredential("password", 4096);
+        // Random salt should ensure that the credentials persisted are different every time
+        assertNotEquals(ScramCredentialUtils.credentialToString(credential1), ScramCredentialUtils.credentialToString(credential2));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void invalidCredential() {
+        ScramCredentialUtils.credentialFromString("abc");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void missingFields() {
+        String cred = ScramCredentialUtils.credentialToString(formatter.generateCredential("password", 2048));
+        ScramCredentialUtils.credentialFromString(cred.substring(cred.indexOf(',')));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void extraneousFields() {
+        String cred = ScramCredentialUtils.credentialToString(formatter.generateCredential("password", 2048));
+        ScramCredentialUtils.credentialFromString(cred + ",a=test");
+    }
+
+    @Test
+    public void scramCredentialCache() throws Exception {
+        CredentialCache cache = new CredentialCache();
+        ScramCredentialUtils.createCache(cache, Arrays.asList("SCRAM-SHA-512", "PLAIN"));
+        assertNotNull("Cache not created for enabled mechanism", cache.cache(ScramMechanism.SCRAM_SHA_512.mechanismName(), ScramCredential.class));
+        assertNull("Cache created for disabled mechanism", cache.cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class));
+
+        CredentialCache.Cache<ScramCredential> sha512Cache = cache.cache(ScramMechanism.SCRAM_SHA_512.mechanismName(), ScramCredential.class);
+        ScramFormatter formatter = new ScramFormatter(ScramMechanism.SCRAM_SHA_512);
+        ScramCredential credentialA = formatter.generateCredential("password", 4096);
+        sha512Cache.put("userA", credentialA);
+        assertEquals(credentialA, sha512Cache.get("userA"));
+        assertNull("Invalid user credential", sha512Cache.get("userB"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
new file mode 100644
index 0000000..bb88f8f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import org.junit.Test;
+
+import javax.xml.bind.DatatypeConverter;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+
+public class ScramFormatterTest {
+
+    /**
+     * Tests that the formatter implementation produces the same values for the
+     * example included in <a href="https://tools.ietf.org/html/rfc5802#section-5">RFC 7677</a>
+     */
+    @Test
+    public void rfc7677Example() throws Exception {
+        ScramFormatter formatter = new ScramFormatter(ScramMechanism.SCRAM_SHA_256);
+
+        String password = "pencil";
+        String c1 = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
+        String s1 = "r=rOprNGfwEbeRWgbNEkqO%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0,s=W22ZaJ0SNY7soEsUEjb6gQ==,i=4096";
+        String c2 = "c=biws,r=rOprNGfwEbeRWgbNEkqO%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0,p=dHzbZapWIk4jUhN+Ute9ytag9zjfMHgsqmmiz7AndVQ=";
+        String s2 = "v=6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4=";
+        ClientFirstMessage clientFirst = new ClientFirstMessage(formatter.toBytes(c1));
+        ServerFirstMessage serverFirst = new ServerFirstMessage(formatter.toBytes(s1));
+        ClientFinalMessage clientFinal = new ClientFinalMessage(formatter.toBytes(c2));
+        ServerFinalMessage serverFinal = new ServerFinalMessage(formatter.toBytes(s2));
+
+        String username = clientFirst.saslName();
+        assertEquals("user", username);
+        String clientNonce = clientFirst.nonce();
+        assertEquals("rOprNGfwEbeRWgbNEkqO", clientNonce);
+        String serverNonce = serverFirst.nonce().substring(clientNonce.length());
+        assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce);
+        byte[] salt = serverFirst.salt();
+        assertArrayEquals(DatatypeConverter.parseBase64Binary("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
+        int iterations = serverFirst.iterations();
+        assertEquals(4096, iterations);
+        byte[] channelBinding = clientFinal.channelBinding();
+        assertArrayEquals(DatatypeConverter.parseBase64Binary("biws"), channelBinding);
+        byte[] serverSignature = serverFinal.serverSignature();
+        assertArrayEquals(DatatypeConverter.parseBase64Binary("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
+
+        byte[] saltedPassword = formatter.saltedPassword(password, salt, iterations);
+        byte[] serverKey = formatter.serverKey(saltedPassword);
+        byte[] computedProof = formatter.clientProof(saltedPassword, clientFirst, serverFirst, clientFinal);
+        assertArrayEquals(clientFinal.proof(), computedProof);
+        byte[] computedSignature = formatter.serverSignature(serverKey, clientFirst, serverFirst, clientFinal);
+        assertArrayEquals(serverFinal.serverSignature(), computedSignature);
+
+        // Minimum iterations defined in RFC-7677
+        assertEquals(4096, ScramMechanism.SCRAM_SHA_256.minIterations());
+    }
+
+    /**
+     * Tests encoding of username
+     */
+    @Test
+    public void saslName() throws Exception {
+        String[] usernames = {"user1", "123", "1,2", "user=A", "user==B", "user,1", "user 1", ",", "=", ",=", "=="};
+        ScramFormatter formatter = new ScramFormatter(ScramMechanism.SCRAM_SHA_256);
+        for (String username : usernames) {
+            String saslName = formatter.saslName(username);
+            // There should be no commas in saslName (comma is used as field separator in SASL messages)
+            assertEquals(-1, saslName.indexOf(','));
+            // There should be no "=" in the saslName apart from those used in encoding (comma is =2C and equals is =3D)
+            assertEquals(-1, saslName.replace("=2C", "").replace("=3D", "").indexOf('='));
+            assertEquals(username, formatter.username(saslName));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
new file mode 100644
index 0000000..dddc4d3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.scram;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import javax.security.sasl.SaslException;
+import javax.xml.bind.DatatypeConverter;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.common.security.scram.ScramMessages.AbstractScramMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
+import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+
+public class ScramMessagesTest {
+
+    private static final String[] VALID_EXTENSIONS = {
+        "ext=val1",
+        "anotherext=name1=value1 name2=another test value \"\'!$[]()",
+        "first=val1,second=name1 = value ,third=123"
+    };
+    private static final String[] INVALID_EXTENSIONS = {
+        "ext1=value",
+        "ext",
+        "ext=value1,value2",
+        "ext=,",
+        "ext =value"
+    };
+
+    private static final String[] VALID_RESERVED = {
+        "m=reserved-value",
+        "m=name1=value1 name2=another test value \"\'!$[]()"
+    };
+    private static final String[] INVALID_RESERVED = {
+        "m",
+        "m=name,value",
+        "m=,"
+    };
+
+    private ScramFormatter formatter;
+
+    @Before
+    public void setUp() throws Exception {
+        formatter  = new ScramFormatter(ScramMechanism.SCRAM_SHA_256);
+    }
+
+    @Test
+    public void validClientFirstMessage() throws SaslException {
+        String nonce = formatter.secureRandomString();
+        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce);
+        checkClientFirstMessage(m, "someuser", nonce, "");
+
+        // Default format used by Kafka client: only user and nonce are specified
+        String str = String.format("n,,n=testuser,r=%s", nonce);
+        m = createScramMessage(ClientFirstMessage.class, str);
+        checkClientFirstMessage(m, "testuser", nonce, "");
+        m = new ClientFirstMessage(m.toBytes());
+        checkClientFirstMessage(m, "testuser", nonce, "");
+
+        // Username containing comma, encoded as =2C
+        str = String.format("n,,n=test=2Cuser,r=%s", nonce);
+        m = createScramMessage(ClientFirstMessage.class, str);
+        checkClientFirstMessage(m, "test=2Cuser", nonce, "");
+        assertEquals("test,user", formatter.username(m.saslName()));
+
+        // Username containing equals, encoded as =3D
+        str = String.format("n,,n=test=3Duser,r=%s", nonce);
+        m = createScramMessage(ClientFirstMessage.class, str);
+        checkClientFirstMessage(m, "test=3Duser", nonce, "");
+        assertEquals("test=user", formatter.username(m.saslName()));
+
+        // Optional authorization id specified
+        str = String.format("n,a=testauthzid,n=testuser,r=%s", nonce);
+        checkClientFirstMessage(createScramMessage(ClientFirstMessage.class, str), "testuser", nonce, "testauthzid");
+
+        // Optional reserved value specified
+        for (String reserved : VALID_RESERVED) {
+            str = String.format("n,,%s,n=testuser,r=%s", reserved, nonce);
+            checkClientFirstMessage(createScramMessage(ClientFirstMessage.class, str), "testuser", nonce, "");
+        }
+
+        // Optional extension specified
+        for (String extension : VALID_EXTENSIONS) {
+            str = String.format("n,,n=testuser,r=%s,%s", nonce, extension);
+            checkClientFirstMessage(createScramMessage(ClientFirstMessage.class, str), "testuser", nonce, "");
+        }
+    }
+
+    @Test
+    public void invalidClientFirstMessage() throws SaslException {
+        String nonce = formatter.secureRandomString();
+        // Invalid entry in gs2-header
+        String invalid = String.format("n,x=something,n=testuser,r=%s", nonce);
+        checkInvalidScramMessage(ClientFirstMessage.class, invalid);
+
+        // Invalid reserved entry
+        for (String reserved : INVALID_RESERVED) {
+            invalid = String.format("n,,%s,n=testuser,r=%s", reserved, nonce);
+            checkInvalidScramMessage(ClientFirstMessage.class, invalid);
+        }
+
+        // Invalid extension
+        for (String extension : INVALID_EXTENSIONS) {
+            invalid = String.format("n,,n=testuser,r=%s,%s", nonce, extension);
+            checkInvalidScramMessage(ClientFirstMessage.class, invalid);
+        }
+    }
+
+    @Test
+    public void validServerFirstMessage() throws SaslException {
+        String clientNonce = formatter.secureRandomString();
+        String serverNonce = formatter.secureRandomString();
+        String nonce = clientNonce + serverNonce;
+        String salt = randomBytesAsString();
+
+        ServerFirstMessage m = new ServerFirstMessage(clientNonce, serverNonce, toBytes(salt), 8192);
+        checkServerFirstMessage(m, nonce, salt, 8192);
+
+        // Default format used by Kafka clients, only nonce, salt and iterations are specified
+        String str = String.format("r=%s,s=%s,i=4096", nonce, salt);
+        m = createScramMessage(ServerFirstMessage.class, str);
+        checkServerFirstMessage(m, nonce, salt, 4096);
+        m = new ServerFirstMessage(m.toBytes());
+        checkServerFirstMessage(m, nonce, salt, 4096);
+
+        // Optional reserved value
+        for (String reserved : VALID_RESERVED) {
+            str = String.format("%s,r=%s,s=%s,i=4096", reserved, nonce, salt);
+            checkServerFirstMessage(createScramMessage(ServerFirstMessage.class, str), nonce, salt, 4096);
+        }
+
+        // Optional extension
+        for (String extension : VALID_EXTENSIONS) {
+            str = String.format("r=%s,s=%s,i=4096,%s", nonce, salt, extension);
+            checkServerFirstMessage(createScramMessage(ServerFirstMessage.class, str), nonce, salt, 4096);
+        }
+    }
+
+    @Test
+    public void invalidServerFirstMessage() throws SaslException {
+        String nonce = formatter.secureRandomString();
+        String salt = randomBytesAsString();
+
+        // Invalid iterations
+        String invalid = String.format("r=%s,s=%s,i=0", nonce, salt);
+        checkInvalidScramMessage(ServerFirstMessage.class, invalid);
+
+        // Invalid salt
+        invalid = String.format("r=%s,s=%s,i=4096", nonce, "=123");
+        checkInvalidScramMessage(ServerFirstMessage.class, invalid);
+
+        // Invalid format
+        invalid = String.format("r=%s,invalid,s=%s,i=4096", nonce, salt);
+        checkInvalidScramMessage(ServerFirstMessage.class, invalid);
+
+        // Invalid reserved entry
+        for (String reserved : INVALID_RESERVED) {
+            invalid = String.format("%s,r=%s,s=%s,i=4096", reserved, nonce, salt);
+            checkInvalidScramMessage(ServerFirstMessage.class, invalid);
+        }
+
+        // Invalid extension
+        for (String extension : INVALID_EXTENSIONS) {
+            invalid = String.format("r=%s,s=%s,i=4096,%s", nonce, salt, extension);
+            checkInvalidScramMessage(ServerFirstMessage.class, invalid);
+        }
+    }
+
+    @Test
+    public void validClientFinalMessage() throws SaslException {
+        String nonce = formatter.secureRandomString();
+        String channelBinding = randomBytesAsString();
+        String proof = randomBytesAsString();
+
+        ClientFinalMessage m = new ClientFinalMessage(toBytes(channelBinding), nonce);
+        assertNull("Invalid proof", m.proof());
+        m.proof(toBytes(proof));
+        checkClientFinalMessage(m, channelBinding, nonce, proof);
+
+        // Default format used by Kafka client: channel-binding, nonce and proof are specified
+        String str = String.format("c=%s,r=%s,p=%s", channelBinding, nonce, proof);
+        m = createScramMessage(ClientFinalMessage.class, str);
+        checkClientFinalMessage(m, channelBinding, nonce, proof);
+        m = new ClientFinalMessage(m.toBytes());
+        checkClientFinalMessage(m, channelBinding, nonce, proof);
+
+        // Optional extension specified
+        for (String extension : VALID_EXTENSIONS) {
+            str = String.format("c=%s,r=%s,%s,p=%s", channelBinding, nonce, extension, proof);
+            checkClientFinalMessage(createScramMessage(ClientFinalMessage.class, str), channelBinding, nonce, proof);
+        }
+    }
+
+    @Test
+    public void invalidClientFinalMessage() throws SaslException {
+        String nonce = formatter.secureRandomString();
+        String channelBinding = randomBytesAsString();
+        String proof = randomBytesAsString();
+
+        // Invalid channel binding
+        String invalid = String.format("c=ab,r=%s,p=%s", nonce, proof);
+        checkInvalidScramMessage(ClientFirstMessage.class, invalid);
+
+        // Invalid proof
+        invalid = String.format("c=%s,r=%s,p=123", channelBinding, nonce);
+        checkInvalidScramMessage(ClientFirstMessage.class, invalid);
+
+        // Invalid extensions
+        for (String extension : INVALID_EXTENSIONS) {
+            invalid = String.format("c=%s,r=%s,%s,p=%s", channelBinding, nonce, extension, proof);
+            checkInvalidScramMessage(ClientFinalMessage.class, invalid);
+        }
+    }
+
+    @Test
+    public void validServerFinalMessage() throws SaslException {
+        String serverSignature = randomBytesAsString();
+
+        ServerFinalMessage m = new ServerFinalMessage("unknown-user", null);
+        checkServerFinalMessage(m, "unknown-user", null);
+        m = new ServerFinalMessage(null, toBytes(serverSignature));
+        checkServerFinalMessage(m, null, serverSignature);
+
+        // Default format used by Kafka clients for successful final message
+        String str = String.format("v=%s", serverSignature);
+        m = createScramMessage(ServerFinalMessage.class, str);
+        checkServerFinalMessage(m, null, serverSignature);
+        m = new ServerFinalMessage(m.toBytes());
+        checkServerFinalMessage(m, null, serverSignature);
+
+        // Default format used by Kafka clients for final message with error
+        str = String.format("e=other-error", serverSignature);
+        m = createScramMessage(ServerFinalMessage.class, str);
+        checkServerFinalMessage(m, "other-error", null);
+        m = new ServerFinalMessage(m.toBytes());
+        checkServerFinalMessage(m, "other-error", null);
+
+        // Optional extension
+        for (String extension : VALID_EXTENSIONS) {
+            str = String.format("v=%s,%s", serverSignature, extension);
+            checkServerFinalMessage(createScramMessage(ServerFinalMessage.class, str), null, serverSignature);
+        }
+    }
+
+    @Test
+    public void invalidServerFinalMessage() throws SaslException {
+        String serverSignature = randomBytesAsString();
+
+        // Invalid error
+        String invalid = "e=error1,error2";
+        checkInvalidScramMessage(ServerFinalMessage.class, invalid);
+
+        // Invalid server signature
+        invalid = String.format("v=1=23");
+        checkInvalidScramMessage(ServerFinalMessage.class, invalid);
+
+        // Invalid extensions
+        for (String extension : INVALID_EXTENSIONS) {
+            invalid = String.format("v=%s,%s", serverSignature, extension);
+            checkInvalidScramMessage(ServerFinalMessage.class, invalid);
+
+            invalid = String.format("e=unknown-user,%s", extension);
+            checkInvalidScramMessage(ServerFinalMessage.class, invalid);
+        }
+    }
+
+    private String randomBytesAsString() {
+        return DatatypeConverter.printBase64Binary(formatter.secureRandomBytes());
+    }
+
+    private byte[] toBytes(String base64Str) {
+        return DatatypeConverter.parseBase64Binary(base64Str);
+    };
+
+    private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
+        assertEquals(saslName, message.saslName());
+        assertEquals(nonce, message.nonce());
+        assertEquals(authzid, message.authorizationId());
+    }
+
+    private void checkServerFirstMessage(ServerFirstMessage message, String nonce, String salt, int iterations) {
+        assertEquals(nonce, message.nonce());
+        assertArrayEquals(DatatypeConverter.parseBase64Binary(salt), message.salt());
+        assertEquals(iterations, message.iterations());
+    }
+
+    private void checkClientFinalMessage(ClientFinalMessage message, String channelBinding, String nonce, String proof) {
+        assertArrayEquals(DatatypeConverter.parseBase64Binary(channelBinding), message.channelBinding());
+        assertEquals(nonce, message.nonce());
+        assertArrayEquals(DatatypeConverter.parseBase64Binary(proof), message.proof());
+    }
+
+    private void checkServerFinalMessage(ServerFinalMessage message, String error, String serverSignature) {
+        assertEquals(error, message.error());
+        if (serverSignature == null)
+            assertNull("Unexpected server signature", message.serverSignature());
+        else
+            assertArrayEquals(DatatypeConverter.parseBase64Binary(serverSignature), message.serverSignature());
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends AbstractScramMessage> T createScramMessage(Class<T> clazz, String message) throws SaslException {
+        byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
+        if (clazz == ClientFirstMessage.class)
+            return (T) new ClientFirstMessage(bytes);
+        else if (clazz == ServerFirstMessage.class)
+            return (T) new ServerFirstMessage(bytes);
+        else if (clazz == ClientFinalMessage.class)
+            return (T) new ClientFinalMessage(bytes);
+        else if (clazz == ServerFinalMessage.class)
+            return (T) new ServerFinalMessage(bytes);
+        else
+            throw new IllegalArgumentException("Unknown message type: " + clazz);
+    }
+
+    private <T extends AbstractScramMessage> void checkInvalidScramMessage(Class<T> clazz, String message) {
+        try {
+            createScramMessage(clazz, message);
+            fail("Exception not throws for invalid message of type " + clazz + " : " + message);
+        } catch (SaslException e) {
+            // Expected exception
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index e95d327..91cd426 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
-import kafka.server.{DynamicConfig, ConfigType}
+import kafka.server.{DynamicConfig, ConfigEntityName, ConfigType}
 import kafka.utils._
 import kafka.utils.ZkUtils._
 import java.util.Random
@@ -504,7 +504,10 @@ object AdminUtils extends Logging with AdminUtilities {
    *
    */
   def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
-    DynamicConfig.Client.validate(configs)
+    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+      DynamicConfig.Client.validate(configs)
+    else
+      DynamicConfig.User.validate(configs)
     changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index a6176a2..3985490 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -25,6 +25,7 @@ import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.Utils
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -46,6 +47,8 @@ import scala.collection.JavaConverters._
  */
 object ConfigCommand extends Config {
 
+  val DefaultScramIterations = 4096
+
   def main(args: Array[String]): Unit = {
 
     val opts = new ConfigCommandOptions(args)
@@ -81,6 +84,9 @@ object ConfigCommand extends Config {
     val entityType = entity.root.entityType
     val entityName = entity.fullSanitizedName
 
+    if (entityType == ConfigType.User)
+      preProcessScramCredentials(configsToBeAdded)
+
     // compile the final set of configs
     val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
 
@@ -102,6 +108,27 @@ object ConfigCommand extends Config {
     println(s"Completed Updating config for entity: $entity.")
   }
 
+  private def preProcessScramCredentials(configsToBeAdded: Properties) {
+    def scramCredential(mechanism: ScramMechanism, credentialStr: String): String = {
+      val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
+      val (iterations, password) = credentialStr match {
+          case pattern(iterations, password) => (if (iterations != null) iterations.toInt else DefaultScramIterations, password)
+          case _ => throw new IllegalArgumentException(s"Invalid credential property $mechanism=$credentialStr")
+        }
+      if (iterations < mechanism.minIterations())
+        throw new IllegalArgumentException(s"Iterations $iterations is less than the minimum ${mechanism.minIterations()} required for $mechanism")
+      val credential = new ScramFormatter(mechanism).generateCredential(password, iterations)
+      ScramCredentialUtils.credentialToString(credential)
+    }
+    for (mechanism <- ScramMechanism.values) {
+      configsToBeAdded.getProperty(mechanism.mechanismName) match {
+        case null =>
+        case value =>
+          configsToBeAdded.setProperty(mechanism.mechanismName, scramCredential(mechanism, value))
+      }
+    }
+  }
+
   private def parseBroker(broker: String): Int = {
     try broker.toInt
     catch {
@@ -128,9 +155,10 @@ object ConfigCommand extends Config {
     val props = new Properties
     if (opts.options.has(opts.addConfig)) {
       //split by commas, but avoid those in [], then into KV pairs
+      val pattern = "(?=[^\\]]*(?:\\[|$))"
       val configsToBeAdded = opts.options.valueOf(opts.addConfig)
-        .split(",(?=[^\\]]*(?:\\[|$))")
-        .map(_.split("""\s*=\s*"""))
+        .split("," + pattern)
+        .map(_.split("""\s*=\s*""" + pattern))
       require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".")
       //Create properties, parsing square brackets from values if necessary
       configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
@@ -283,7 +311,7 @@ object ConfigCommand extends Config {
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
             "For entity_type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
             "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
-            "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
             "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
             s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
             .withRequiredArg

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a820171..96b0e46 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
@@ -90,9 +90,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
     val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
     val networkClient = {
-      val channelBuilder = ChannelBuilders.create(
+      val channelBuilder = ChannelBuilders.clientChannelBuilder(
         config.interBrokerSecurityProtocol,
-        Mode.CLIENT,
         LoginType.SERVER,
         config.values,
         config.saslMechanismInterBrokerProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 95c5fdf..d8d0144 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,11 +29,12 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.{BrokerEndPoint, EndPoint}
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
+import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, Selectable, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -49,7 +50,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
  *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
-class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
+class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
 
   private val endpoints = config.listeners
   private val numProcessorThreads = config.numNetworkThreads
@@ -147,7 +148,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       config.connectionsMaxIdleMs,
       protocol,
       config.values,
-      metrics
+      metrics,
+      credentialProvider
     )
   }
 
@@ -373,7 +375,8 @@ private[kafka] class Processor(val id: Int,
                                connectionsMaxIdleMs: Long,
                                protocol: SecurityProtocol,
                                channelConfigs: java.util.Map[String, _],
-                               metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               metrics: Metrics,
+                               credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -411,7 +414,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
-    ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))
+    ChannelBuilders.serverChannelBuilder(protocol, channelConfigs, credentialProvider.credentialCache))
 
   override def run() {
     startupComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/security/CredentialProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
new file mode 100644
index 0000000..5d9d7ba
--- /dev/null
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security
+
+import java.util.{List, Properties}
+
+import org.apache.kafka.common.security.authenticator.CredentialCache
+import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef._
+
+class CredentialProvider(saslEnabledMechanisms: List[String]) {
+
+  val credentialCache = new CredentialCache
+  ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
+
+  def updateCredentials(username: String, config: Properties) {
+    for (mechanism <- ScramMechanism.values()) {
+      val cache = credentialCache.cache(mechanism.mechanismName, classOf[ScramCredential])
+      if (cache != null) {
+        config.getProperty(mechanism.mechanismName) match {
+          case null => cache.remove(username)
+          case c => cache.put(username, ScramCredentialUtils.credentialFromString(c))
+        }
+      }
+    }
+  }
+}
+
+object CredentialProvider {
+  def userCredentialConfigs: ConfigDef = {
+    ScramMechanism.values.foldLeft(new ConfigDef) {
+      (c, m) => c.define(m.mechanismName, Type.STRING, null, Importance.MEDIUM, s"User credentials for SCRAM mechanism ${m.mechanismName}")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 7f204d5..56d53ff 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import DynamicConfig.Broker._
 import kafka.api.ApiVersion
 import kafka.log.{LogConfig, LogManager}
+import kafka.security.CredentialProvider
 import kafka.server.Constants._
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
@@ -30,6 +31,7 @@ import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
 import scala.collection.JavaConverters._
+
 /**
   * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
   */
@@ -134,7 +136,7 @@ class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends Qu
  * The callback provides the node name containing sanitized user principal, client-id if this is
  * a <user, client-id> update and the full properties set read from ZK.
  */
-class UserConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler {
+class UserConfigHandler(private val quotaManagers: QuotaManagers, val credentialProvider: CredentialProvider) extends QuotaConfigHandler(quotaManagers) with ConfigHandler {
 
   def processConfigChanges(quotaEntityPath: String, config: Properties) {
     // Entity path is <user> or <user>/clients/<client>
@@ -144,6 +146,8 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaC
     val sanitizedUser = entities(0)
     val clientId = if (entities.length == 3) Some(entities(2)) else None
     updateQuotaConfig(Some(sanitizedUser), clientId, config)
+    if (!clientId.isDefined && sanitizedUser != ConfigEntityName.Default)
+      credentialProvider.updateCredentials(QuotaId.desanitize(sanitizedUser), config)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 4a9d0a9..e68f921 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util.Properties
 import kafka.log.LogConfig
+import kafka.security.CredentialProvider
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
@@ -81,6 +82,18 @@ object DynamicConfig {
     def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props)
   }
 
+  object User {
+
+    //Definitions
+    private val userConfigs = CredentialProvider.userCredentialConfigs
+      .define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
+      .define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
+
+    def names = userConfigs.names
+
+    def validate(props: Properties) = DynamicConfig.validate(userConfigs, props)
+  }
+
   private def validate(configDef: ConfigDef, props: Properties) = {
     //Validate Names
     val names = configDef.names()

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ef72847..df40c64 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -33,6 +33,7 @@ import kafka.coordinator.GroupCoordinator
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import kafka.network.{BlockingChannel, SocketServer}
+import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
@@ -117,6 +118,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
+  var credentialProvider: CredentialProvider = null
 
   var groupCoordinator: GroupCoordinator = null
 
@@ -202,8 +204,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
         metadataCache = new MetadataCache(config.brokerId)
+        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
 
-        socketServer = new SocketServer(config, metrics, time)
+        socketServer = new SocketServer(config, metrics, time, credentialProvider)
         socketServer.startup()
 
         /* start replica manager */
@@ -242,7 +245,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
                                                            ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
-                                                           ConfigType.User -> new UserConfigHandler(quotaManagers),
+                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
         // Create the config manager. start listening to notifications
@@ -351,9 +354,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     def networkClientControlledShutdown(retries: Int): Boolean = {
       val metadataUpdater = new ManualMetadataUpdater()
       val networkClient = {
-        val channelBuilder = ChannelBuilders.create(
+        val channelBuilder = ChannelBuilders.clientChannelBuilder(
           config.interBrokerSecurityProtocol,
-          Mode.CLIENT,
           LoginType.SERVER,
           config.values,
           config.saslMechanismInterBrokerProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 3811be3..485a25e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -76,9 +76,8 @@ class ReplicaFetcherThread(name: String,
   // as the metrics tag to avoid metric name conflicts with
   // more than one fetcher thread to the same broker
   private val networkClient = {
-    val channelBuilder = ChannelBuilders.create(
+    val channelBuilder = ChannelBuilders.clientChannelBuilder(
       brokerConfig.interBrokerSecurityProtocol,
-      Mode.CLIENT,
       LoginType.SERVER,
       brokerConfig.values,
       brokerConfig.saslMechanismInterBrokerProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 479e749..4d86040 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   override val consumerCount = 2
   override val serverCount = 3
 
-  override def setAclsBeforeServersStart() {
+  override def configureSecurityBeforeServersStart() {
     AclCommand.main(clusterAclArgs)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..b483884
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -0,0 +1,49 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramMechanism
+import kafka.utils.JaasTestUtils
+import kafka.admin.ConfigCommand
+import kafka.utils.ZkUtils
+import scala.collection.JavaConverters._
+
+class SaslScramSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
+  override val clientPrincipal = JaasTestUtils.KafkaScramUser
+  override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
+  private val clientPassword = JaasTestUtils.KafkaScramPassword
+  private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
+
+    def configCommandArgs(username: String, password: String) : Array[String] = {
+      val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]")
+      Array("--zookeeper", zkConnect,
+            "--alter", "--add-config", credentials.mkString(","),
+            "--entity-type", "users",
+            "--entity-name", username)
+    }
+    ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword))
+    ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 62015cb..445ee09 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -24,9 +24,12 @@ import kafka.server.{ConfigEntityName, QuotaId}
 import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 
+import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
+import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   @Test
@@ -233,6 +236,51 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def testScramCredentials(): Unit = {
+    def createOpts(user: String, config: String): ConfigCommandOptions = {
+      new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+        "--entity-name", user,
+        "--entity-type", "users",
+        "--alter",
+        "--add-config", config))
+    }
+
+    def deleteOpts(user: String, mechanism: String) = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+        "--entity-name", user,
+        "--entity-type", "users",
+        "--alter",
+        "--delete-config", mechanism))
+
+    val credentials = mutable.Map[String, Properties]()
+    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends TestAdminUtils {
+      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+        credentials.getOrElse(entityName, new Properties())
+      }
+      override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configChange: Properties): Unit = {
+        assertEquals(user, sanitizedEntityName)
+        assertEquals(mechanisms, configChange.keySet().asScala)
+        for (mechanism <- mechanisms) {
+          val value = configChange.getProperty(mechanism)
+          assertEquals(-1, value.indexOf("password="))
+          val scramCredential = ScramCredentialUtils.credentialFromString(value)
+          assertEquals(iterations, scramCredential.iterations)
+          if (configChange != null)
+              credentials.put(user, configChange)
+        }
+      }
+    }
+    val optsA = createOpts("userA", "SCRAM-SHA-256=[iterations=8192,password=abc, def]")
+    ConfigCommand.alterConfig(null, optsA, CredentialChange("userA", Set("SCRAM-SHA-256"), 8192))
+    val optsB = createOpts("userB", "SCRAM-SHA-256=[iterations=4096,password=abc, def],SCRAM-SHA-512=[password=1234=abc]")
+    ConfigCommand.alterConfig(null, optsB, CredentialChange("userB", Set("SCRAM-SHA-256", "SCRAM-SHA-512"), 4096))
+
+    val del256 = deleteOpts("userB", "SCRAM-SHA-256")
+    ConfigCommand.alterConfig(null, del256, CredentialChange("userB", Set("SCRAM-SHA-512"), 4096))
+    val del512 = deleteOpts("userB", "SCRAM-SHA-512")
+    ConfigCommand.alterConfig(null, del512, CredentialChange("userB", Set(), 4096))
+  }
+
+  @Test
   def testQuotaConfigEntity() {
 
     def createOpts(entityType: String, entityName: Option[String], otherArgs: Array[String]) : ConfigCommandOptions = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index f418b30..13b37e1 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -46,7 +46,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   def generateConfigs(): Seq[KafkaConfig]
 
   /**
-   * Override this in case ACLs must be set before `servers` are started.
+   * Override this in case ACLs or security credentials must be set before `servers` are started.
    *
    * This is required in some cases because of the topic creation in the setup of `IntegrationTestHarness`. If the ACLs
    * are only set later, tests may fail. The failure could manifest itself as a cluster action
@@ -56,7 +56,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
    *
    * The default implementation of this method is a no-op.
    */
-  def setAclsBeforeServersStart() {}
+  def configureSecurityBeforeServersStart() {}
 
   def configs: Seq[KafkaConfig] = {
     if (instanceConfigs == null)
@@ -78,7 +78,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
       throw new KafkaException("Must supply at least one server config.")
 
     // default implementation is a no-op, it is overridden by subclasses if required
-    setAclsBeforeServersStart()
+    configureSecurityBeforeServersStart()
 
     servers = configs.map(TestUtils.createServer(_)).toBuffer
     brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index c6f90ff..8547818 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -25,6 +25,7 @@ import javax.net.ssl._
 
 import com.yammer.metrics.core.Gauge
 import com.yammer.metrics.{Metrics => YammerMetrics}
+import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
@@ -54,7 +55,8 @@ class SocketServerTest extends JUnitSuite {
   props.put("connections.max.idle.ms", "60000")
   val config = KafkaConfig.fromProps(props)
   val metrics = new Metrics
-  val server = new SocketServer(config, metrics, Time.SYSTEM)
+  val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+  val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider)
   server.startup()
   val sockets = new ArrayBuffer[Socket]
 
@@ -241,7 +243,7 @@ class SocketServerTest extends JUnitSuite {
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
     val serverMetrics = new Metrics()
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM, credentialProvider)
     try {
       overrideServer.startup()
       // make the maximum allowable number of connections
@@ -271,7 +273,7 @@ class SocketServerTest extends JUnitSuite {
     overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
 
     val serverMetrics = new Metrics
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM, credentialProvider)
     try {
       overrideServer.startup()
       val sslContext = SSLContext.getInstance("TLSv1.2")
@@ -319,10 +321,10 @@ class SocketServerTest extends JUnitSuite {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM) {
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-          config.connectionsMaxIdleMs, protocol, config.values, metrics) {
+          config.connectionsMaxIdleMs, protocol, config.values, metrics, credentialProvider) {
           override protected[network] def sendResponse(response: RequestChannel.Response) {
             conn.close()
             super.sendResponse(response)
@@ -369,7 +371,7 @@ class SocketServerTest extends JUnitSuite {
     props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100")
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider)
     try {
       overrideServer.startup()
       conn = connect(overrideServer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/275c5e1d/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 70b0b2f..ed08e8a 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -56,6 +56,22 @@ object JaasTestUtils {
     }
   }
 
+  case class ScramLoginModule(username: String,
+                              password: String,
+                              debug: Boolean = false,
+                              validUsers: Map[String, String] = Map.empty) {
+    def toJaasModule: JaasModule = {
+      JaasModule(
+        "org.apache.kafka.common.security.scram.ScramLoginModule",
+        debug = debug,
+        entries = Map(
+          "username" -> username,
+          "password" -> password
+        ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
+      )
+    }
+  }
+
   case class JaasModule(moduleName: String,
                         debug: Boolean,
                         entries: Map[String, String]) {
@@ -94,6 +110,11 @@ object JaasTestUtils {
   private val KafkaPlainAdmin = "admin"
   private val KafkaPlainAdminPassword = "admin-secret"
 
+  val KafkaScramUser = "scram-user"
+  val KafkaScramPassword = "scram-user-secret"
+  val KafkaScramAdmin = "scram-admin"
+  val KafkaScramAdminPassword = "scram-admin-secret"
+
   def writeZkFile(): String = {
     val jaasFile = TestUtils.tempFile()
     writeToFile(jaasFile, zkSections)
@@ -138,6 +159,11 @@ object JaasTestUtils {
           KafkaPlainAdminPassword,
           debug = false,
           Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword)).toJaasModule
+      case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
+        ScramLoginModule(
+          KafkaScramAdmin,
+          KafkaScramAdminPassword,
+          debug = false).toJaasModule
       case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism)
     }
     new JaasSection(KafkaServerContextName, modules)
@@ -159,6 +185,11 @@ object JaasTestUtils {
           KafkaPlainUser,
           KafkaPlainPassword
         ).toJaasModule
+      case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
+        ScramLoginModule(
+          KafkaScramUser,
+          KafkaScramPassword
+        ).toJaasModule
       case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism)
     }
   }


Mime
View raw message