kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/3] kafka git commit: KAFKA-4636; Per listener security settings overrides (KIP-103)
Date Fri, 27 Jan 2017 01:25:20 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 0bff0ee..fb00e9c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -47,7 +47,9 @@ public class NioEchoServer extends Thread {
     private volatile WritableByteChannel outputChannel;
     private final CredentialCache credentialCache;
 
-    public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
+    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost) throws Exception {
+        super("echoserver");
+        setDaemon(true);
         serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(false);
         serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
@@ -57,10 +59,8 @@ public class NioEchoServer extends Thread {
         this.credentialCache = new CredentialCache();
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
-        ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(securityProtocol, configs, credentialCache);
+        ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
-        setName("echoserver");
-        setDaemon(true);
         acceptorThread = new AcceptorThread();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 01d8a25..3bc1b50 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -14,8 +14,6 @@ package org.apache.kafka.common.network;
 
 import static org.junit.Assert.fail;
 
-import java.util.Arrays;
-import java.util.Map;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -24,19 +22,22 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.Map;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.common.config.types.Password;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -83,7 +84,7 @@ public class SslTransportLayerTest {
     @Test
     public void testValidEndpointIdentification() throws Exception {
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
@@ -105,7 +106,7 @@ public class SslTransportLayerTest {
         sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
         sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -121,7 +122,9 @@ public class SslTransportLayerTest {
     public void testEndpointIdentificationDisabled() throws Exception {
         String node = "0";
         String serverHost = InetAddress.getLocalHost().getHostAddress();
-        server = new NioEchoServer(SecurityProtocol.SSL, sslServerConfigs, serverHost);
+        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+        server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol,
+                new TestSecurityConfig(sslServerConfigs), serverHost);
         server.start();
         sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
         createSelector(sslClientConfigs);
@@ -139,13 +142,53 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequiredValidProvided() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
+
+    /**
+     * Tests that disabling client authentication as a listener override has the desired effect.
+     */
+    @Test
+    public void testListenerConfigOverride() throws Exception {
+        String node = "0";
+        ListenerName clientListenerName = new ListenerName("client");
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        sslServerConfigs.put(clientListenerName.configPrefix() + SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+
+        // `client` listener is not configured at this point, so client auth should be required
+        server = createEchoServer(SecurityProtocol.SSL);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+
+        // Connect with client auth should work fine
+        createSelector(sslClientConfigs);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+        selector.close();
+
+        // Remove client auth, so connection should fail
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+        createSelector(sslClientConfigs);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+        selector.close();
+        server.close();
+
+        // Listener-specific config should be used and client auth should be disabled
+        server = createEchoServer(clientListenerName, SecurityProtocol.SSL);
+        addr = new InetSocketAddress("localhost", server.port());
+
+        // Connect without client auth should work fine now
+        createSelector(sslClientConfigs);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+    }
     
     /**
      * Tests that server does not accept connections from clients with an untrusted certificate
@@ -156,7 +199,7 @@ public class SslTransportLayerTest {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -172,7 +215,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequiredNotProvided() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@@ -193,7 +236,7 @@ public class SslTransportLayerTest {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -209,7 +252,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationDisabledNotProvided() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@@ -229,7 +272,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequestedValidProvided() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -245,7 +288,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequestedNotProvided() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@@ -310,7 +353,7 @@ public class SslTransportLayerTest {
     public void testInvalidKeyPassword() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid"));
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -325,7 +368,7 @@ public class SslTransportLayerTest {
     public void testUnsupportedTLSVersion() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
         createSelector(sslClientConfigs);
@@ -343,7 +386,7 @@ public class SslTransportLayerTest {
         String node = "0";
         String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
         sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
         createSelector(sslClientConfigs);
@@ -359,7 +402,7 @@ public class SslTransportLayerTest {
     @Test
     public void testNetReadBufferResize() throws Exception {
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs, 10, null, null);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -373,7 +416,7 @@ public class SslTransportLayerTest {
     @Test
     public void testNetWriteBufferResize() throws Exception {
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs, null, 10, null);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -387,7 +430,7 @@ public class SslTransportLayerTest {
     @Test
     public void testApplicationBufferResize() throws Exception {
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs, null, null, 10);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -407,7 +450,7 @@ public class SslTransportLayerTest {
 
     private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs);
+        server = createEchoServer(securityProtocol);
         clientChannelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
@@ -441,7 +484,8 @@ public class SslTransportLayerTest {
         createSelector(sslClientConfigs, null, null, null);
     }      
 
-    private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
+    private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
+                                final Integer netWriteBufSize, final Integer appBufSize) {
         
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
 
@@ -460,6 +504,14 @@ public class SslTransportLayerTest {
         this.channelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
     }
+
+    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
+        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(sslServerConfigs));
+    }
+
+    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
+        return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
+    }
     
     /**
      * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
new file mode 100644
index 0000000..6040aa2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import javax.security.auth.login.Configuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.ListenerName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests parsing of {@link SaslConfigs#SASL_JAAS_CONFIG} property and verifies that the format
+ * and parsing are consistent with JAAS configuration files loaded by the JRE.
+ */
+public class JaasContextTest {
+
+    private File jaasConfigFile;
+
+    @Before
+    public void setUp() throws IOException {
+        jaasConfigFile = File.createTempFile("jaas", ".conf");
+        jaasConfigFile.deleteOnExit();
+        Configuration.setConfiguration(null);
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.toString());
+    }
+
+    @After
+    public void tearDown() {
+        jaasConfigFile.delete();
+    }
+
+    @Test
+    public void testConfigNoOptions() throws Exception {
+        checkConfiguration("test.testConfigNoOptions", LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>());
+    }
+
+    @Test
+    public void testControlFlag() throws Exception {
+        LoginModuleControlFlag[] controlFlags = new LoginModuleControlFlag[] {
+            LoginModuleControlFlag.REQUIRED,
+            LoginModuleControlFlag.REQUISITE,
+            LoginModuleControlFlag.SUFFICIENT,
+            LoginModuleControlFlag.OPTIONAL
+        };
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "propValue");
+        for (LoginModuleControlFlag controlFlag : controlFlags) {
+            checkConfiguration("test.testControlFlag", controlFlag, options);
+        }
+    }
+
+    @Test
+    public void testSingleOption() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "propValue");
+        checkConfiguration("test.testSingleOption", LoginModuleControlFlag.REQUISITE, options);
+    }
+
+    @Test
+    public void testMultipleOptions() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        for (int i = 0; i < 10; i++)
+            options.put("propName" + i, "propValue" + i);
+        checkConfiguration("test.testMultipleOptions", LoginModuleControlFlag.SUFFICIENT, options);
+    }
+
+    @Test
+    public void testQuotedOptionValue() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("propName", "prop value");
+        options.put("propName2", "value1 = 1, value2 = 2");
+        String config = String.format("test.testQuotedOptionValue required propName=\"%s\" propName2=\"%s\";", options.get("propName"), options.get("propName2"));
+        checkConfiguration(config, "test.testQuotedOptionValue", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    @Test
+    public void testQuotedOptionName() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("prop name", "propValue");
+        String config = "test.testQuotedOptionName required \"prop name\"=propValue;";
+        checkConfiguration(config, "test.testQuotedOptionName", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    @Test
+    public void testMultipleLoginModules() throws Exception {
+        StringBuilder builder = new StringBuilder();
+        int moduleCount = 3;
+        Map<Integer, Map<String, Object>> moduleOptions = new HashMap<>();
+        for (int i = 0; i < moduleCount; i++) {
+            Map<String, Object> options = new HashMap<>();
+            options.put("index", "Index" + i);
+            options.put("module", "Module" + i);
+            moduleOptions.put(i, options);
+            String module = jaasConfigProp("test.Module" + i, LoginModuleControlFlag.REQUIRED, options);
+            builder.append(' ');
+            builder.append(module);
+        }
+        String jaasConfigProp = builder.toString();
+
+        String clientContextName = "CLIENT";
+        Configuration configuration = new JaasConfig(clientContextName, jaasConfigProp);
+        AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(clientContextName);
+        assertEquals(moduleCount, dynamicEntries.length);
+
+        for (int i = 0; i < moduleCount; i++) {
+            AppConfigurationEntry entry = dynamicEntries[i];
+            checkEntry(entry, "test.Module" + i, LoginModuleControlFlag.REQUIRED, moduleOptions.get(i));
+        }
+
+        String serverContextName = "SERVER";
+        writeConfiguration(serverContextName, jaasConfigProp);
+        AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(serverContextName);
+        for (int i = 0; i < moduleCount; i++) {
+            AppConfigurationEntry staticEntry = staticEntries[i];
+            checkEntry(staticEntry, dynamicEntries[i].getLoginModuleName(), LoginModuleControlFlag.REQUIRED, dynamicEntries[i].getOptions());
+        }
+    }
+
+    @Test
+    public void testMissingLoginModule() throws Exception {
+        checkInvalidConfiguration("  required option1=value1;");
+    }
+
+    @Test
+    public void testMissingControlFlag() throws Exception {
+        checkInvalidConfiguration("test.loginModule option1=value1;");
+    }
+
+    @Test
+    public void testMissingOptionValue() throws Exception {
+        checkInvalidConfiguration("loginModule required option1;");
+    }
+
+    @Test
+    public void testMissingSemicolon() throws Exception {
+        checkInvalidConfiguration("test.testMissingSemicolon required option1=value1");
+    }
+
+    @Test
+    public void testNumericOptionWithoutQuotes() throws Exception {
+        checkInvalidConfiguration("test.testNumericOptionWithoutQuotes required option1=3;");
+    }
+
+    @Test
+    public void testNumericOptionWithQuotes() throws Exception {
+        Map<String, Object> options = new HashMap<>();
+        options.put("option1", "3");
+        String config = "test.testNumericOptionWithQuotes required option1=\"3\";";
+        checkConfiguration(config, "test.testNumericOptionWithQuotes", LoginModuleControlFlag.REQUIRED, options);
+    }
+
+    @Test
+    public void testLoadForServerWithListenerNameOverride() throws IOException {
+        writeConfiguration(Arrays.asList(
+                "KafkaServer { test.LoginModuleDefault required; };",
+                "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
+        ));
+        JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
+                Collections.<String, Object>emptyMap());
+        assertEquals("plaintext.KafkaServer", context.name());
+        assertEquals(JaasContext.Type.SERVER, context.type());
+        assertEquals(1, context.configurationEntries().size());
+        checkEntry(context.configurationEntries().get(0), "test.LoginModuleOverride",
+                LoginModuleControlFlag.REQUISITE, Collections.<String, Object>emptyMap());
+    }
+
+    @Test
+    public void testLoadForServerWithListenerNameAndFallback() throws IOException {
+        writeConfiguration(Arrays.asList(
+                "KafkaServer { test.LoginModule required; };",
+                "other.KafkaServer { test.LoginModuleOther requisite; };"
+        ));
+        JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
+                Collections.<String, Object>emptyMap());
+        assertEquals("KafkaServer", context.name());
+        assertEquals(JaasContext.Type.SERVER, context.type());
+        assertEquals(1, context.configurationEntries().size());
+        checkEntry(context.configurationEntries().get(0), "test.LoginModule", LoginModuleControlFlag.REQUIRED,
+                Collections.<String, Object>emptyMap());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLoadForServerWithWrongListenerName() throws IOException {
+        writeConfiguration("Server", "test.LoginModule required;");
+        JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
+                Collections.<String, Object>emptyMap());
+    }
+
+    /**
+     * ListenerName can only be used with Type.SERVER.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testLoadForClientWithListenerName() {
+        JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"),
+                Collections.<String, Object>emptyMap());
+    }
+
+    private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
+        Map<String, Object> configs = new HashMap<>();
+        if (jaasConfigProp != null)
+            configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
+        JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs);
+        List<AppConfigurationEntry> entries = context.configurationEntries();
+        assertEquals(1, entries.size());
+        return entries.get(0);
+    }
+
+    private String controlFlag(LoginModuleControlFlag loginModuleControlFlag) {
+        // LoginModuleControlFlag.toString() has format "LoginModuleControlFlag: flag"
+        String[] tokens = loginModuleControlFlag.toString().split(" ");
+        return tokens[tokens.length - 1];
+    }
+
+    private String jaasConfigProp(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(loginModule);
+        builder.append(' ');
+        builder.append(controlFlag(controlFlag));
+        for (Map.Entry<String, Object> entry : options.entrySet()) {
+            builder.append(' ');
+            builder.append(entry.getKey());
+            builder.append('=');
+            builder.append(entry.getValue());
+        }
+        builder.append(';');
+        return builder.toString();
+    }
+
+    private void writeConfiguration(String contextName, String jaasConfigProp) throws IOException {
+        List<String> lines = Arrays.asList(contextName + " { ", jaasConfigProp, "};");
+        writeConfiguration(lines);
+    }
+
+    private void writeConfiguration(List<String> lines) throws IOException {
+        Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
+        Configuration.setConfiguration(null);
+    }
+
+    private void checkConfiguration(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
+        String jaasConfigProp = jaasConfigProp(loginModule, controlFlag, options);
+        checkConfiguration(jaasConfigProp, loginModule, controlFlag, options);
+    }
+
+    private void checkEntry(AppConfigurationEntry entry, String loginModule, LoginModuleControlFlag controlFlag, Map<String, ?> options) {
+        assertEquals(loginModule, entry.getLoginModuleName());
+        assertEquals(controlFlag, entry.getControlFlag());
+        assertEquals(options, entry.getOptions());
+    }
+
+    private void checkConfiguration(String jaasConfigProp, String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
+        AppConfigurationEntry dynamicEntry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp);
+        checkEntry(dynamicEntry, loginModule, controlFlag, options);
+        assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(JaasContext.Type.CLIENT.name()));
+
+        writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp);
+        AppConfigurationEntry staticEntry = configurationEntry(JaasContext.Type.SERVER, null);
+        checkEntry(staticEntry, loginModule, controlFlag, options);
+    }
+
+    private void checkInvalidConfiguration(String jaasConfigProp) throws IOException {
+        try {
+            writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp);
+            AppConfigurationEntry entry = configurationEntry(JaasContext.Type.SERVER, null);
+            fail("Invalid JAAS configuration file didn't throw exception, entry=" + entry);
+        } catch (SecurityException e) {
+            // Expected exception
+        }
+        try {
+            AppConfigurationEntry entry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp);
+            fail("Invalid JAAS configuration property didn't throw exception, entry=" + entry);
+        } catch (IllegalArgumentException e) {
+            // Expected exception
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
deleted file mode 100644
index 10ec390..0000000
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.security;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
-import javax.security.auth.login.Configuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.network.LoginType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests parsing of {@link SaslConfigs#SASL_JAAS_CONFIG} property and verifies that the format
- * and parsing are consistent with JAAS configuration files loaded by the JRE.
- */
-public class JaasUtilsTest {
-
-    private File jaasConfigFile;
-
-    @Before
-    public void setUp() throws IOException {
-        jaasConfigFile = File.createTempFile("jaas", ".conf");
-        jaasConfigFile.deleteOnExit();
-        Configuration.setConfiguration(null);
-        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.toString());
-    }
-
-    @After
-    public void tearDown() {
-        jaasConfigFile.delete();
-    }
-
-    @Test
-    public void testConfigNoOptions() throws Exception {
-        checkConfiguration("test.testConfigNoOptions", LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>());
-    }
-
-    @Test
-    public void testControlFlag() throws Exception {
-        LoginModuleControlFlag[] controlFlags = new LoginModuleControlFlag[] {
-            LoginModuleControlFlag.REQUIRED,
-            LoginModuleControlFlag.REQUISITE,
-            LoginModuleControlFlag.SUFFICIENT,
-            LoginModuleControlFlag.OPTIONAL
-        };
-        Map<String, Object> options = new HashMap<>();
-        options.put("propName", "propValue");
-        for (LoginModuleControlFlag controlFlag : controlFlags) {
-            checkConfiguration("test.testControlFlag", controlFlag, options);
-        }
-    }
-
-    @Test
-    public void testSingleOption() throws Exception {
-        Map<String, Object> options = new HashMap<>();
-        options.put("propName", "propValue");
-        checkConfiguration("test.testSingleOption", LoginModuleControlFlag.REQUISITE, options);
-    }
-
-    @Test
-    public void testMultipleOptions() throws Exception {
-        Map<String, Object> options = new HashMap<>();
-        for (int i = 0; i < 10; i++)
-            options.put("propName" + i, "propValue" + i);
-        checkConfiguration("test.testMultipleOptions", LoginModuleControlFlag.SUFFICIENT, options);
-    }
-
-    @Test
-    public void testQuotedOptionValue() throws Exception {
-        Map<String, Object> options = new HashMap<>();
-        options.put("propName", "prop value");
-        options.put("propName2", "value1 = 1, value2 = 2");
-        String config = String.format("test.testQuotedOptionValue required propName=\"%s\" propName2=\"%s\";", options.get("propName"), options.get("propName2"));
-        checkConfiguration(config, "test.testQuotedOptionValue", LoginModuleControlFlag.REQUIRED, options);
-    }
-
-    @Test
-    public void testQuotedOptionName() throws Exception {
-        Map<String, Object> options = new HashMap<>();
-        options.put("prop name", "propValue");
-        String config = "test.testQuotedOptionName required \"prop name\"=propValue;";
-        checkConfiguration(config, "test.testQuotedOptionName", LoginModuleControlFlag.REQUIRED, options);
-    }
-
-    @Test
-    public void testMultipleLoginModules() throws Exception {
-        StringBuilder builder = new StringBuilder();
-        int moduleCount = 3;
-        Map<Integer, Map<String, Object>> moduleOptions = new HashMap<>();
-        for (int i = 0; i < moduleCount; i++) {
-            Map<String, Object> options = new HashMap<>();
-            options.put("index", "Index" + i);
-            options.put("module", "Module" + i);
-            moduleOptions.put(i, options);
-            String module = jaasConfigProp("test.Module" + i, LoginModuleControlFlag.REQUIRED, options);
-            builder.append(' ');
-            builder.append(module);
-        }
-        String jaasConfigProp = builder.toString();
-
-        Configuration configuration = new JaasConfig(LoginType.CLIENT, jaasConfigProp);
-        AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(LoginType.CLIENT.contextName());
-        assertEquals(moduleCount, dynamicEntries.length);
-
-        for (int i = 0; i < moduleCount; i++) {
-            AppConfigurationEntry entry = dynamicEntries[i];
-            checkEntry(entry, "test.Module" + i, LoginModuleControlFlag.REQUIRED, moduleOptions.get(i));
-        }
-
-        writeConfiguration(LoginType.SERVER, jaasConfigProp);
-        AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(LoginType.SERVER.contextName());
-        for (int i = 0; i < moduleCount; i++) {
-            AppConfigurationEntry staticEntry = staticEntries[i];
-            checkEntry(staticEntry, dynamicEntries[i].getLoginModuleName(), LoginModuleControlFlag.REQUIRED, dynamicEntries[i].getOptions());
-        }
-    }
-
-    @Test
-    public void testMissingLoginModule() throws Exception {
-        checkInvalidConfiguration("  required option1=value1;");
-    }
-
-    @Test
-    public void testMissingControlFlag() throws Exception {
-        checkInvalidConfiguration("test.loginModule option1=value1;");
-    }
-
-    @Test
-    public void testMissingOptionValue() throws Exception {
-        checkInvalidConfiguration("loginModule required option1;");
-    }
-
-    @Test
-    public void testMissingSemicolon() throws Exception {
-        checkInvalidConfiguration("test.testMissingSemicolon required option1=value1");
-    }
-
-    @Test
-    public void testNumericOptionWithoutQuotes() throws Exception {
-        checkInvalidConfiguration("test.testNumericOptionWithoutQuotes required option1=3;");
-    }
-
-    @Test
-    public void testNumericOptionWithQuotes() throws Exception {
-        Map<String, Object> options = new HashMap<>();
-        options.put("option1", "3");
-        String config = "test.testNumericOptionWithQuotes required option1=\"3\";";
-        checkConfiguration(config, "test.testNumericOptionWithQuotes", LoginModuleControlFlag.REQUIRED, options);
-    }
-
-    private AppConfigurationEntry configurationEntry(LoginType loginType, String jaasConfigProp) {
-        Map<String, Object> configs = new HashMap<>();
-        if (jaasConfigProp != null)
-            configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
-        Configuration configuration = JaasUtils.jaasConfig(loginType, configs);
-        AppConfigurationEntry[] entry = configuration.getAppConfigurationEntry(loginType.contextName());
-        assertEquals(1, entry.length);
-        return entry[0];
-    }
-
-    private String controlFlag(LoginModuleControlFlag loginModuleControlFlag) {
-        // LoginModuleControlFlag.toString() has format "LoginModuleControlFlag: flag"
-        String[] tokens = loginModuleControlFlag.toString().split(" ");
-        return tokens[tokens.length - 1];
-    }
-
-    private String jaasConfigProp(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(loginModule);
-        builder.append(' ');
-        builder.append(controlFlag(controlFlag));
-        for (Map.Entry<String, Object> entry : options.entrySet()) {
-            builder.append(' ');
-            builder.append(entry.getKey());
-            builder.append('=');
-            builder.append(entry.getValue());
-        }
-        builder.append(';');
-        return builder.toString();
-    }
-
-    private void writeConfiguration(LoginType loginType, String jaasConfigProp) throws IOException {
-        List<String> lines = Arrays.asList(loginType.contextName() + " { ", jaasConfigProp, "};");
-        Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
-        Configuration.setConfiguration(null);
-    }
-
-    private void checkConfiguration(String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
-        String jaasConfigProp = jaasConfigProp(loginModule, controlFlag, options);
-        checkConfiguration(jaasConfigProp, loginModule, controlFlag, options);
-    }
-
-    private void checkEntry(AppConfigurationEntry entry, String loginModule, LoginModuleControlFlag controlFlag, Map<String, ?> options) {
-        assertEquals(loginModule, entry.getLoginModuleName());
-        assertEquals(controlFlag, entry.getControlFlag());
-        assertEquals(options, entry.getOptions());
-    }
-
-    private void checkConfiguration(String jaasConfigProp, String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
-        AppConfigurationEntry dynamicEntry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
-        checkEntry(dynamicEntry, loginModule, controlFlag, options);
-        assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(LoginType.CLIENT.contextName()));
-
-        writeConfiguration(LoginType.SERVER, jaasConfigProp);
-        AppConfigurationEntry staticEntry = configurationEntry(LoginType.SERVER, null);
-        checkEntry(staticEntry, loginModule, controlFlag, options);
-    }
-
-    private void checkInvalidConfiguration(String jaasConfigProp) throws IOException {
-        try {
-            writeConfiguration(LoginType.SERVER, jaasConfigProp);
-            AppConfigurationEntry entry = configurationEntry(LoginType.SERVER, null);
-            fail("Invalid JAAS configuration file didn't throw exception, entry=" + entry);
-        } catch (SecurityException e) {
-            // Expected exception
-        }
-        try {
-            AppConfigurationEntry entry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
-            fail("Invalid JAAS configuration property didn't throw exception, entry=" + entry);
-        } catch (IllegalArgumentException e) {
-            // Expected exception
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
new file mode 100644
index 0000000..8c1c038
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Map;
+
+public class TestSecurityConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG = new ConfigDef()
+            .define(SslConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, Importance.MEDIUM,
+                    SslConfigs.SSL_CLIENT_AUTH_DOC)
+            .define(SaslConfigs.SASL_ENABLED_MECHANISMS, Type.LIST, SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS,
+                    Importance.MEDIUM, SaslConfigs.SASL_ENABLED_MECHANISMS_DOC)
+            .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS,
+                    Importance.MEDIUM, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+            .withClientSslSupport()
+            .withClientSaslSupport();
+
+    public TestSecurityConfig(Map<?, ?> originals) {
+        super(CONFIG, originals, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index ac9beb4..bc967af 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -19,7 +19,7 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
-import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
@@ -38,7 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
-import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramFormatter;
@@ -63,7 +64,6 @@ import java.util.Random;
 import javax.security.auth.login.Configuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -106,7 +106,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
     }
 
@@ -119,7 +119,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
     }
 
@@ -133,7 +133,7 @@ public class SaslAuthenticatorTest {
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword");
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
     }
@@ -148,7 +148,7 @@ public class SaslAuthenticatorTest {
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD);
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
     }
@@ -163,7 +163,7 @@ public class SaslAuthenticatorTest {
         jaasConfig.setPlainClientOptions(null, "mypassword");
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createSelector(securityProtocol, saslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
         try {
@@ -184,7 +184,7 @@ public class SaslAuthenticatorTest {
         jaasConfig.setPlainClientOptions("myuser", null);
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createSelector(securityProtocol, saslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
         try {
@@ -205,7 +205,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
     }
 
@@ -217,7 +217,7 @@ public class SaslAuthenticatorTest {
     public void testMultipleServerMechanisms() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
 
         String node1 = "1";
@@ -230,15 +230,12 @@ public class SaslAuthenticatorTest {
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
         selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
         NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
-        selector.close();
 
         String node3 = "3";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         createSelector(securityProtocol, saslClientConfigs);
         selector.connect(node3, new InetSocketAddress("127.0.0.1", server.port()), BUFFER_SIZE, BUFFER_SIZE);
         NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
-        selector.close();
-        selector = null;
     }
 
     /**
@@ -249,7 +246,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnection(securityProtocol, "0");
     }
@@ -262,7 +259,7 @@ public class SaslAuthenticatorTest {
     public void testValidSaslScramMechanisms() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
 
         for (String mechanism : ScramMechanism.mechanismNames()) {
@@ -281,10 +278,10 @@ public class SaslAuthenticatorTest {
         Map<String, Object> options = new HashMap<>();
         options.put("username", TestJaasConfig.USERNAME);
         options.put("password", "invalidpassword");
-        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
 
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
@@ -300,10 +297,10 @@ public class SaslAuthenticatorTest {
         Map<String, Object> options = new HashMap<>();
         options.put("username", "unknownUser");
         options.put("password", TestJaasConfig.PASSWORD);
-        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
 
         String node = "0";
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
@@ -317,7 +314,7 @@ public class SaslAuthenticatorTest {
     public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
 
         server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
@@ -325,7 +322,6 @@ public class SaslAuthenticatorTest {
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
-        selector.close();
 
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
         createAndCheckClientConnection(securityProtocol, "2");
@@ -344,9 +340,9 @@ public class SaslAuthenticatorTest {
         Map<String, Object> options = new HashMap<>();
         options.put("username", username);
         options.put("password", password);
-        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         updateScramCredentialCache(username, password);
         createAndCheckClientConnection(securityProtocol, "0");
     }
@@ -386,7 +382,7 @@ public class SaslAuthenticatorTest {
     public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send ApiVersionsRequest with unsupported version and validate error response.
         String node = "1";
@@ -418,7 +414,7 @@ public class SaslAuthenticatorTest {
     public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send ApiVersionsRequest and validate error response.
         String node1 = "invalid1";
@@ -442,7 +438,7 @@ public class SaslAuthenticatorTest {
     public void testInvalidSaslPacket() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send invalid SASL packet after valid handshake request
         String node1 = "invalid1";
@@ -481,7 +477,7 @@ public class SaslAuthenticatorTest {
     public void testInvalidApiVersionsRequestSequence() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send handshake request followed by ApiVersionsRequest
         String node1 = "invalid1";
@@ -508,7 +504,7 @@ public class SaslAuthenticatorTest {
     public void testPacketSizeTooBig() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send SASL packet with large size after valid handshake request
         String node1 = "invalid1";
@@ -548,7 +544,7 @@ public class SaslAuthenticatorTest {
     public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Send metadata request before Kafka SASL handshake request
         String node1 = "invalid1";
@@ -586,10 +582,10 @@ public class SaslAuthenticatorTest {
     @Test
     public void testInvalidLoginModule() throws Exception {
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         try {
             createSelector(securityProtocol, saslClientConfigs);
             fail("SASL/PLAIN channel created without valid login module");
@@ -608,7 +604,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
     }
@@ -623,7 +619,7 @@ public class SaslAuthenticatorTest {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
 
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
         createClientConnection(securityProtocol, node);
         NetworkTestUtils.waitForChannelClose(selector, node);
     }
@@ -642,10 +638,11 @@ public class SaslAuthenticatorTest {
         serverOptions.put("user_user1", "user1-secret");
         serverOptions.put("user_user2", "user2-secret");
         TestJaasConfig staticJaasConfig = new TestJaasConfig();
-        staticJaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), serverOptions);
+        staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
+                serverOptions);
         staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
         Configuration.setConfiguration(staticJaasConfig);
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Check that client using static Jaas config does not connect since password is invalid
         createAndCheckClientConnectionFailure(securityProtocol, "1");
@@ -669,11 +666,52 @@ public class SaslAuthenticatorTest {
         try {
             createClientConnection(securityProtocol, "1");
             fail("Connection created with multiple login modules in sasl.jaas.config");
-        } catch (KafkaException e) {
-            assertTrue("Unexpected exception " + e, e.getCause() instanceof IllegalArgumentException);
+        } catch (IllegalArgumentException e) {
+            // Expected
         }
     }
 
+    @Test
+    public void testJaasConfigurationForListener() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN"));
+
+        TestJaasConfig staticJaasConfig = new TestJaasConfig();
+
+        Map<String, Object> globalServerOptions = new HashMap<>();
+        globalServerOptions.put("user_global1", "gsecret1");
+        globalServerOptions.put("user_global2", "gsecret2");
+        staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
+                globalServerOptions);
+
+        Map<String, Object> clientListenerServerOptions = new HashMap<>();
+        clientListenerServerOptions.put("user_client1", "csecret1");
+        clientListenerServerOptions.put("user_client2", "csecret2");
+        String clientJaasEntryName = "client." + TestJaasConfig.LOGIN_CONTEXT_SERVER;
+        staticJaasConfig.createOrUpdateEntry(clientJaasEntryName, PlainLoginModule.class.getName(), clientListenerServerOptions);
+        Configuration.setConfiguration(staticJaasConfig);
+
+        // Listener-specific credentials
+        server = createEchoServer(new ListenerName("client"), securityProtocol);
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
+        createAndCheckClientConnection(securityProtocol, "1");
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
+        createAndCheckClientConnectionFailure(securityProtocol, "2");
+        server.close();
+
+        // Global credentials as there is no listener-specific JAAS entry
+        server = createEchoServer(new ListenerName("other"), securityProtocol);
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
+        createAndCheckClientConnection(securityProtocol, "3");
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
+        createAndCheckClientConnectionFailure(securityProtocol, "4");
+    }
+
     /**
      * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
      * prior to SASL handshake flow and that subsequent authentication succeeds
@@ -700,7 +738,7 @@ public class SaslAuthenticatorTest {
      */
     private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
-        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+        server = createEchoServer(securityProtocol);
 
         // Create non-SASL connection to manually authenticate after ApiVersionsRequest
         String node = "1";
@@ -748,11 +786,26 @@ public class SaslAuthenticatorTest {
     }
 
     private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
+        if (selector != null) {
+            selector.close();
+            selector = null;
+        }
+
         String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
-        this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, clientConfigs, saslMechanism, true);
+        this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
+                new TestSecurityConfig(clientConfigs), null, saslMechanism, true);
         this.selector = NetworkTestUtils.createSelector(channelBuilder);
     }
 
+    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
+        return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
+    }
+
+    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
+        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol,
+                new TestSecurityConfig(saslServerConfigs));
+    }
+
     private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
         createSelector(securityProtocol, saslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index a27b87a..fb73d69 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -22,13 +22,15 @@ import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 
 import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 
 public class TestJaasConfig extends Configuration {
 
+    static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
+    static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
+
     static final String USERNAME = "myuser";
     static final String PASSWORD = "mypassword";
 
@@ -36,9 +38,9 @@ public class TestJaasConfig extends Configuration {
 
     public static TestJaasConfig createConfiguration(String clientMechanism, List<String> serverMechanisms) {
         TestJaasConfig config = new TestJaasConfig();
-        config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
+        config.createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
         for (String mechanism : serverMechanisms) {
-            config.addEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
+            config.addEntry(LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
         }
         Configuration.setConfiguration(config);
         return config;
@@ -54,7 +56,7 @@ public class TestJaasConfig extends Configuration {
             options.put("username", clientUsername);
         if (clientPassword != null)
             options.put("password", clientPassword);
-        createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options);
+        createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options);
     }
 
     public void createOrUpdateEntry(String name, String loginModule, Map<String, Object> options) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 91e921f..3c17294 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -167,7 +167,7 @@ public class TestSslUtils {
     }
 
     private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
-                                                      File trustStoreFile, Password trustStorePassword) {
+                                                       File trustStoreFile, Password trustStorePassword) {
         Map<String, Object> sslConfigs = new HashMap<>();
         sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index ac13472..f61eaa2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -90,7 +90,7 @@ public class WorkerGroupMember {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
             String metricGrpPrefix = "connect";
-            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 680c5e1..13b6571 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -226,7 +226,7 @@ object AdminClient {
     val time = Time.SYSTEM
     val metrics = new Metrics(time)
     val metadata = new Metadata
-    val channelBuilder = ClientUtils.createChannelBuilder(config.values())
+    val channelBuilder = ClientUtils.createChannelBuilder(config)
 
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d928034..d8e6a95 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -26,11 +26,12 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, LoginType, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
+import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
 
@@ -94,8 +95,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
         config.interBrokerSecurityProtocol,
-        LoginType.SERVER,
-        config.values,
+        JaasContext.Type.SERVER,
+        config,
+        config.interBrokerListenerName,
         config.saslMechanismInterBrokerProtocol,
         config.saslInterBrokerHandshakeRequestEnable
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c0353d5..b9bf3e4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -34,7 +34,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, LoginType, Mode, Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode, Selectable, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -150,7 +150,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       config.connectionsMaxIdleMs,
       listenerName,
       securityProtocol,
-      config.values,
+      config,
       metrics,
       credentialProvider
     )
@@ -379,7 +379,7 @@ private[kafka] class Processor(val id: Int,
                                connectionsMaxIdleMs: Long,
                                listenerName: ListenerName,
                                securityProtocol: SecurityProtocol,
-                               channelConfigs: java.util.Map[String, _],
+                               config: KafkaConfig,
                                metrics: Metrics,
                                credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
@@ -419,7 +419,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
-    ChannelBuilders.serverChannelBuilder(securityProtocol, channelConfigs, credentialProvider.credentialCache))
+    ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
 
   override def run() {
     startupComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b5075f9..dcbd3b4 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
 import kafka.api.KAFKA_0_9_0
-import kafka.cluster.{Broker, EndPoint}
+import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.coordinator.GroupCoordinator
@@ -37,13 +37,13 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
-import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
 
@@ -360,8 +360,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       val networkClient = {
         val channelBuilder = ChannelBuilders.clientChannelBuilder(
           config.interBrokerSecurityProtocol,
-          LoginType.SERVER,
-          config.values,
+          JaasContext.Type.SERVER,
+          config,
+          config.interBrokerListenerName,
           config.saslMechanismInterBrokerProtocol,
           config.saslInterBrokerHandshakeRequestEnable)
         val selector = new Selector(

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d6663fa..df640eb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,13 +27,14 @@ import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_
 import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network.{ChannelBuilders, Mode, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
 import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.Map
@@ -78,8 +79,9 @@ class ReplicaFetcherThread(name: String,
   private val networkClient = {
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       brokerConfig.interBrokerSecurityProtocol,
-      LoginType.SERVER,
-      brokerConfig.values,
+      JaasContext.Type.SERVER,
+      brokerConfig,
+      brokerConfig.interBrokerListenerName,
       brokerConfig.saslMechanismInterBrokerProtocol,
       brokerConfig.saslInterBrokerHandshakeRequestEnable
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 92088f8..10baa42 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -48,6 +48,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   override def generateConfigs() = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+    cfgs.foreach { config =>
+      config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
+      config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
+      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
+      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
+    }
     cfgs.foreach(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index 826eb5c..dd91627 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -37,14 +37,15 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   
   @Before
   override def setUp {
-    startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism))
+    startSasl(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)
     super.setUp
   }
 
   // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests
-  override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanism: Option[String]) {
+  override protected def setJaasConfiguration(mode: SaslSetupMode, serverEntryName: String,
+                                              serverMechanisms: List[String], clientMechanism: Option[String]) {
     // create static config with client login context with credentials for JaasTestUtils 'client2'
-    super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanism)
+    super.setJaasConfiguration(mode, serverEntryName, kafkaServerSaslMechanisms, clientMechanism)
     // set dynamic properties with credentials for JaasTestUtils 'client1'
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 125d431..ddf9578 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -13,13 +13,20 @@
 package kafka.api
 
 import java.io.File
+import java.util.Locale
+
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.network.ListenerName
 
 class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
   override protected val zkSaslEnabled = true
+  override protected def listenerName = new ListenerName("CLIENT")
   override protected val kafkaClientSaslMechanism = "PLAIN"
   override protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  override protected val kafkaServerJaasEntryName =
+    s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))


Mime
View raw message