From commits-return-8854-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Sun Feb 4 18:19:27 2018
Return-Path:
X-Original-To: archive-asf-public@eu.ponee.io
Delivered-To: archive-asf-public@eu.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by mx-eu-01.ponee.io (Postfix) with ESMTP id B40A718064A
for ; Sun, 4 Feb 2018 18:19:27 +0100 (CET)
Received: by cust-asf.ponee.io (Postfix)
id A39B7160C40; Sun, 4 Feb 2018 17:19:27 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id 31135160C37
for ; Sun, 4 Feb 2018 18:19:24 +0100 (CET)
Received: (qmail 10433 invoked by uid 500); 4 Feb 2018 17:19:23 -0000
Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@kafka.apache.org
Delivered-To: mailing list commits@kafka.apache.org
Received: (qmail 10424 invoked by uid 99); 4 Feb 2018 17:19:23 -0000
Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70)
by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Feb 2018 17:19:23 +0000
Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33)
id EF64381FCC; Sun, 4 Feb 2018 17:19:21 +0000 (UTC)
Date: Sun, 04 Feb 2018 17:19:21 +0000
To: "commits@kafka.apache.org"
Subject: [kafka] branch trunk updated: KAFKA-6246;
Dynamic update of listeners and security configs (#4488)
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 8bit
Message-ID: <151776476149.26683.15052716633963097665@gitbox.apache.org>
From: jgus@apache.org
X-Git-Host: gitbox.apache.org
X-Git-Repo: kafka
X-Git-Refname: refs/heads/trunk
X-Git-Reftype: branch
X-Git-Oldrev: f9b56d680bab86fad276d01c8421d3679a788f2b
X-Git-Newrev: 4019b21d602c0912d9dc0286f10701e7e724f750
X-Git-Rev: 4019b21d602c0912d9dc0286f10701e7e724f750
X-Git-NotificationType: ref_changed_plus_diff
X-Git-Multimail-Version: 1.5.dev
Auto-Submitted: auto-generated
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4019b21 KAFKA-6246; Dynamic update of listeners and security configs (#4488)
4019b21 is described below
commit 4019b21d602c0912d9dc0286f10701e7e724f750
Author: Rajini Sivaram
AuthorDate: Sun Feb 4 09:19:17 2018 -0800
KAFKA-6246; Dynamic update of listeners and security configs (#4488)
Dynamic update of listeners as described in KIP-226. This includes:
- Addition of new listeners with listener-prefixed security configs
- Removal of existing listeners
- Password encryption
- sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
---
checkstyle/import-control.xml | 1 +
.../org/apache/kafka/common/Reconfigurable.java | 9 +-
.../apache/kafka/common/config/AbstractConfig.java | 16 +
.../kafka/common/network/ChannelBuilders.java | 18 +-
.../apache/kafka/common/network/ListenerName.java | 4 +
.../kafka/common/network/SaslChannelBuilder.java | 58 +--
.../kafka/common/network/SslChannelBuilder.java | 4 +-
.../apache/kafka/common/security/JaasContext.java | 84 ++---
.../security/authenticator/CredentialCache.java | 8 +-
.../security/authenticator/LoginManager.java | 4 +-
.../authenticator/SaslServerAuthenticator.java | 23 +-
.../security/scram/ScramCredentialUtils.java | 4 +-
.../kafka/common/security/ssl/SslFactory.java | 7 +-
.../kafka/common/config/AbstractConfigTest.java | 49 +++
.../common/network/SaslChannelBuilderTest.java | 20 +-
.../kafka/common/security/JaasContextTest.java | 25 +-
.../authenticator/SaslAuthenticatorTest.java | 48 ++-
.../authenticator/SaslServerAuthenticatorTest.java | 13 +-
.../security/authenticator/TestJaasConfig.java | 14 +
.../scala/kafka/controller/KafkaController.scala | 67 +++-
.../main/scala/kafka/network/SocketServer.scala | 56 ++-
.../scala/kafka/security/CredentialProvider.scala | 6 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 254 ++++++++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 99 +++--
core/src/main/scala/kafka/server/KafkaServer.scala | 9 +-
.../main/scala/kafka/utils/PasswordEncoder.scala | 175 +++++++++
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 7 +
.../kafka/api/AdminClientIntegrationTest.scala | 4 +-
.../scala/integration/kafka/api/SaslSetup.scala | 16 +-
.../server/DynamicBrokerReconfigurationTest.scala | 404 +++++++++++++++++++--
...pleListenersWithAdditionalJaasContextTest.scala | 24 +-
...ltipleListenersWithDefaultJaasContextTest.scala | 9 +-
...ListenersWithSameSecurityProtocolBaseTest.scala | 81 +++--
.../test/scala/unit/kafka/KafkaConfigTest.scala | 12 +-
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 4 +-
.../unit/kafka/network/SocketServerTest.scala | 5 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 155 ++++++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 10 +
.../unit/kafka/utils/PasswordEncoderTest.scala | 127 +++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
41 files changed, 1574 insertions(+), 363 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5662552..000acc3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -50,6 +50,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
index 3339dce..8db9dc2 100644
--- a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common;
+import org.apache.kafka.common.config.ConfigException;
+
import java.util.Map;
import java.util.Set;
@@ -33,9 +35,12 @@ public interface Reconfigurable extends Configurable {
* Validates the provided configuration. The provided map contains
* all configs including any reconfigurable configs that may be different
* from the initial configuration. Reconfiguration will be not performed
- * if this method returns false or throws any exception.
+ * if this method throws any exception.
+ * @throws ConfigException if the provided configs are not valid. The exception
+ * message from ConfigException will be returned to the client in
+ * the AlterConfigs response.
*/
- boolean validateReconfiguration(Map configs);
+ void validateReconfiguration(Map configs) throws ConfigException;
/**
* Reconfigures this instance with the given key-value pairs. The provided
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index ce3ae43..1713d78 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -204,6 +204,16 @@ public class AbstractConfig {
* put all the remaining keys with the prefix stripped and their parsed values in the result map.
*
* This is useful if one wants to allow prefixed configs to override default ones.
+ *
+ * Two forms of prefixes are supported:
+ *
+ * - listener.name.{listenerName}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
+ * the key `some.prop` with the value parsed using the definition of `some.prop` is returned.
+ * - listener.name.{listenerName}.{mechanism}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
+ * the key `{mechanism}.some.prop` with the value parsed using the definition of `some.prop` is returned.
+ * This is used to provide per-mechanism configs for a broker listener (e.g sasl.jaas.config)
+ *
+ *
*/
public Map valuesWithPrefixOverride(String prefix) {
Map result = new RecordingMap<>(values(), prefix, true);
@@ -213,6 +223,12 @@ public class AbstractConfig {
ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix);
if (configKey != null)
result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
+ else {
+ String keyWithNoSecondaryPrefix = keyWithNoPrefix.substring(keyWithNoPrefix.indexOf('.') + 1);
+ configKey = definition.configKeys().get(keyWithNoSecondaryPrefix);
+ if (configKey != null)
+ result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
+ }
}
}
return result;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index e6df78e..80ccb7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -29,6 +29,9 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class ChannelBuilders {
@@ -105,9 +108,20 @@ public class ChannelBuilders {
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
- JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
+ Map jaasContexts;
+ if (mode == Mode.SERVER) {
+ List enabledMechanisms = (List) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
+ jaasContexts = new HashMap<>(enabledMechanisms.size());
+ for (String mechanism : enabledMechanisms)
+ jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
+ } else {
+ // Use server context for inter-broker client connections and client context for other clients
+ JaasContext jaasContext = contextType == JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
+ JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
+ jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
+ }
channelBuilder = new SaslChannelBuilder(mode,
- jaasContext,
+ jaasContexts,
securityProtocol,
listenerName,
isInterBrokerListener,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
index 9da4cca..fc0cb14 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
@@ -71,4 +71,8 @@ public final class ListenerName {
public String configPrefix() {
return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + ".";
}
+
+ public String saslMechanismConfigPrefix(String saslMechanism) {
+ return configPrefix() + saslMechanism.toLowerCase(Locale.ROOT) + ".";
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 1716e0e..095f826 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,6 +41,7 @@ import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,18 +56,19 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final boolean isInterBrokerListener;
private final String clientSaslMechanism;
private final Mode mode;
- private final JaasContext jaasContext;
+ private final Map jaasContexts;
private final boolean handshakeRequestEnable;
private final CredentialCache credentialCache;
private final DelegationTokenCache tokenCache;
+ private final Map loginManagers;
+ private final Map subjects;
- private LoginManager loginManager;
private SslFactory sslFactory;
private Map configs;
private KerberosShortNamer kerberosShortNamer;
public SaslChannelBuilder(Mode mode,
- JaasContext jaasContext,
+ Map jaasContexts,
SecurityProtocol securityProtocol,
ListenerName listenerName,
boolean isInterBrokerListener,
@@ -75,7 +77,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
CredentialCache credentialCache,
DelegationTokenCache tokenCache) {
this.mode = mode;
- this.jaasContext = jaasContext;
+ this.jaasContexts = jaasContexts;
+ this.loginManagers = new HashMap<>(jaasContexts.size());
+ this.subjects = new HashMap<>(jaasContexts.size());
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
this.isInterBrokerListener = isInterBrokerListener;
@@ -89,14 +93,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
public void configure(Map configs) throws KafkaException {
try {
this.configs = configs;
- boolean hasKerberos;
- if (mode == Mode.SERVER) {
- @SuppressWarnings("unchecked")
- List enabledMechanisms = (List) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
- hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
- } else {
- hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
- }
+ boolean hasKerberos = jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM);
if (hasKerberos) {
String defaultRealm;
@@ -110,8 +107,14 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
if (principalToLocalRules != null)
kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
}
- this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs);
-
+ for (Map.Entry entry : jaasContexts.entrySet()) {
+ String mechanism = entry.getKey();
+ // With static JAAS configuration, use KerberosLogin if Kerberos is enabled. With dynamic JAAS configuration,
+ // use KerberosLogin only for the LoginContext corresponding to GSSAPI
+ LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, hasKerberos, configs);
+ loginManagers.put(mechanism, loginManager);
+ subjects.put(mechanism, loginManager.subject());
+ }
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
// Disable SSL client authentication as we are using SASL authentication
this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener);
@@ -129,11 +132,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
}
@Override
- public boolean validateReconfiguration(Map configs) {
+ public void validateReconfiguration(Map configs) {
if (this.securityProtocol == SecurityProtocol.SASL_SSL)
- return sslFactory.validateReconfiguration(configs);
- else
- return true;
+ sslFactory.validateReconfiguration(configs);
}
@Override
@@ -154,11 +155,13 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
Socket socket = socketChannel.socket();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
- if (mode == Mode.SERVER)
- authenticator = buildServerAuthenticator(configs, id, transportLayer, loginManager.subject());
- else
+ if (mode == Mode.SERVER) {
+ authenticator = buildServerAuthenticator(configs, id, transportLayer, subjects);
+ } else {
+ LoginManager loginManager = loginManagers.get(clientSaslMechanism);
authenticator = buildClientAuthenticator(configs, id, socket.getInetAddress().getHostName(),
loginManager.serviceName(), transportLayer, loginManager.subject());
+ }
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
@@ -168,10 +171,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
@Override
public void close() {
- if (loginManager != null) {
+ for (LoginManager loginManager : loginManagers.values())
loginManager.release();
- loginManager = null;
- }
+ loginManagers.clear();
}
private TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
@@ -185,8 +187,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
// Visible to override for testing
protected SaslServerAuthenticator buildServerAuthenticator(Map configs, String id,
- TransportLayer transportLayer, Subject subject) throws IOException {
- return new SaslServerAuthenticator(configs, id, jaasContext, subject,
+ TransportLayer transportLayer, Map subjects) throws IOException {
+ return new SaslServerAuthenticator(configs, id, jaasContexts, subjects,
kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache);
}
@@ -198,8 +200,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
}
// Package private for testing
- LoginManager loginManager() {
- return loginManager;
+ Map loginManagers() {
+ return loginManagers;
}
private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index e024d32..941c455 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -71,8 +71,8 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
}
@Override
- public boolean validateReconfiguration(Map configs) {
- return sslFactory.validateReconfiguration(configs);
+ public void validateReconfiguration(Map configs) {
+ sslFactory.validateReconfiguration(configs);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index a13acd2..46db345 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -41,57 +41,59 @@ public class JaasContext {
/**
* Returns an instance of this class.
*
- * For contextType SERVER, the context will contain the default Configuration and the context name will be one of:
+ * The context will contain the configuration specified by the JAAS configuration property
+ * {@link SaslConfigs#SASL_JAAS_CONFIG} with prefix `listener.name.{listenerName}.{mechanism}.`
+ * with listenerName and mechanism in lower case. The context `KafkaServer` will be returned
+ * with a single login context entry loaded from the property.
+ *
+ * If the property is not defined, the context will contain the default Configuration and
+ * the context name will be one of:
+ *
+ * - Lowercased listener name followed by a period and the string `KafkaServer`
+ * - The string `KafkaServer`
+ *
+ * If both are valid entries in the default JAAS configuration, the first option is chosen.
+ *
*
- * 1. Lowercased listener name followed by a period and the string `KafkaServer`
- * 2. The string `KafkaServer`
- *
- * If both are valid entries in the JAAS configuration, the first option is chosen.
+ * @throws IllegalArgumentException if listenerName or mechanism is not defined.
+ */
+ public static JaasContext loadServerContext(ListenerName listenerName, String mechanism, Map configs) {
+ if (listenerName == null)
+ throw new IllegalArgumentException("listenerName should not be null for SERVER");
+ if (mechanism == null)
+ throw new IllegalArgumentException("mechanism should not be null for SERVER");
+ String globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
+ String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
+ Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
+ if (jaasConfigArgs == null && configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+ LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG);
+ return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs);
+ }
+
+ /**
+ * Returns an instance of this class.
*
- * For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
+ * If JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
* the configuration object is created by parsing the property value. Otherwise, the default Configuration
* is returned. The context name is always `KafkaClient`.
*
- * @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if
- * listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT.
*/
- public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName,
- Map configs) {
- String listenerContextName;
- String globalContextName;
- switch (contextType) {
- case CLIENT:
- if (listenerName != null)
- throw new IllegalArgumentException("listenerName should be null for CLIENT");
- globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
- listenerContextName = null;
- break;
- case SERVER:
- if (listenerName == null)
- throw new IllegalArgumentException("listenerName should not be null for SERVER");
- globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
- listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
- break;
- default:
- throw new IllegalArgumentException("Unexpected context type " + contextType);
- }
- return load(contextType, listenerContextName, globalContextName, configs);
+ public static JaasContext loadClientContext(Map configs) {
+ String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
+ Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+ return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs);
}
static JaasContext load(JaasContext.Type contextType, String listenerContextName,
- String globalContextName, Map configs) {
- Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+ String globalContextName, Password jaasConfigArgs) {
if (jaasConfigArgs != null) {
- if (contextType == JaasContext.Type.SERVER)
- throw new IllegalArgumentException("JAAS config property not supported for server");
- else {
- JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
- AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName);
- int numModules = clientModules == null ? 0 : clientModules.length;
- if (numModules != 1)
- throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module");
- return new JaasContext(globalContextName, contextType, jaasConfig);
- }
+ JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
+ AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
+ if (contextModules == null || contextModules.length == 0)
+ throw new IllegalArgumentException("JAAS config property does not contain any login modules");
+ else if (contextModules.length != 1)
+ throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
+ return new JaasContext(globalContextName, contextType, jaasConfig);
} else
return defaultContext(contextType, listenerContextName, globalContextName);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
index aa39101..96e7426 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
@@ -16,18 +16,16 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class CredentialCache {
- private final Map> cacheMap = new HashMap<>();
+ private final ConcurrentHashMap> cacheMap = new ConcurrentHashMap<>();
public Cache createCache(String mechanism, Class credentialClass) {
Cache cache = new Cache(credentialClass);
- cacheMap.put(mechanism, cache);
- return cache;
+ Cache oldCache = (Cache) cacheMap.putIfAbsent(mechanism, cache);
+ return oldCache == null ? cache : oldCache;
}
@SuppressWarnings("unchecked")
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index a576e37..dc264c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -64,7 +64,7 @@ public class LoginManager {
* shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
* complicated to do the latter without making the consumer API more complex.
*/
- public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos,
+ public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos,
Map configs) throws IOException, LoginException {
synchronized (LoginManager.class) {
// SASL_JAAS_CONFIG is only supported by clients
@@ -73,7 +73,7 @@ public class LoginManager {
if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
if (loginManager == null) {
- loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
+ loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue);
DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager);
}
} else {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 6fcca57..ca6e9d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -101,8 +101,8 @@ public class SaslServerAuthenticator implements Authenticator {
private final SecurityProtocol securityProtocol;
private final ListenerName listenerName;
private final String connectionId;
- private final JaasContext jaasContext;
- private final Subject subject;
+ private final Map jaasContexts;
+ private final Map subjects;
private final CredentialCache credentialCache;
private final TransportLayer transportLayer;
private final Set enabledMechanisms;
@@ -128,19 +128,17 @@ public class SaslServerAuthenticator implements Authenticator {
public SaslServerAuthenticator(Map configs,
String connectionId,
- JaasContext jaasContext,
- Subject subject,
+ Map jaasContexts,
+ Map subjects,
KerberosShortNamer kerberosNameParser,
CredentialCache credentialCache,
ListenerName listenerName,
SecurityProtocol securityProtocol,
TransportLayer transportLayer,
DelegationTokenCache tokenCache) throws IOException {
- if (subject == null)
- throw new IllegalArgumentException("subject cannot be null");
this.connectionId = connectionId;
- this.jaasContext = jaasContext;
- this.subject = subject;
+ this.jaasContexts = jaasContexts;
+ this.subjects = subjects;
this.credentialCache = credentialCache;
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
@@ -154,6 +152,12 @@ public class SaslServerAuthenticator implements Authenticator {
if (enabledMechanisms == null || enabledMechanisms.isEmpty())
throw new IllegalArgumentException("No SASL mechanisms are enabled");
this.enabledMechanisms = new HashSet<>(enabledMechanisms);
+ for (String mechanism : enabledMechanisms) {
+ if (!jaasContexts.containsKey(mechanism))
+ throw new IllegalArgumentException("Jaas context not specified for SASL mechanism " + mechanism);
+ if (!subjects.containsKey(mechanism))
+ throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
+ }
// Note that the old principal builder does not support SASL, so we do not need to pass the
// authenticator or the transport layer
@@ -162,8 +166,9 @@ public class SaslServerAuthenticator implements Authenticator {
private void createSaslServer(String mechanism) throws IOException {
this.saslMechanism = mechanism;
+ Subject subject = subjects.get(mechanism);
if (!ScramMechanism.isScram(mechanism))
- callbackHandler = new SaslServerCallbackHandler(jaasContext);
+ callbackHandler = new SaslServerCallbackHandler(jaasContexts.get(mechanism));
else
callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
index 8d80542..b4875d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
@@ -76,9 +76,9 @@ public final class ScramCredentialUtils {
return props;
}
- public static void createCache(CredentialCache cache, Collection enabledMechanisms) {
+ public static void createCache(CredentialCache cache, Collection mechanisms) {
for (String mechanism : ScramMechanism.mechanismNames()) {
- if (enabledMechanisms.contains(mechanism))
+ if (mechanisms.contains(mechanism))
cache.createCache(mechanism, ScramCredential.class);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 0d1fbf9..c54260d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -144,14 +144,13 @@ public class SslFactory implements Reconfigurable {
}
@Override
- public boolean validateReconfiguration(Map configs) {
+ public void validateReconfiguration(Map configs) {
try {
SecurityStore newKeystore = maybeCreateNewKeystore(configs);
if (newKeystore != null)
createSSLContext(newKeystore);
- return true;
} catch (Exception e) {
- throw new KafkaException("Validation of dynamic config update failed", e);
+ throw new ConfigException("Validation of dynamic config update failed", e);
}
}
@@ -163,7 +162,7 @@ public class SslFactory implements Reconfigurable {
this.sslContext = createSSLContext(newKeystore);
this.keystore = newKeystore;
} catch (Exception e) {
- throw new KafkaException("Reconfiguration of SSL keystore failed", e);
+ throw new ConfigException("Reconfiguration of SSL keystore failed", e);
}
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 074df10..071deed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
@@ -140,6 +141,54 @@ public class AbstractConfigTest {
}
@Test
+ public void testValuesWithSecondaryPrefix() {
+ String prefix = "listener.name.listener1.";
+ Password saslJaasConfig1 = new Password("test.myLoginModule1 required;");
+ Password saslJaasConfig2 = new Password("test.myLoginModule2 required;");
+ Password saslJaasConfig3 = new Password("test.myLoginModule3 required;");
+ Properties props = new Properties();
+ props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value());
+ props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value());
+ props.put("sasl.jaas.config", saslJaasConfig3.value());
+ props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
+ props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka");
+ props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000");
+ props.put("ssl.provider", "TEST");
+ TestSecurityConfig config = new TestSecurityConfig(props);
+ Map valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
+
+ // prefix with mechanism overrides global
+ assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+ assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
+ assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
+ assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
+ assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+ assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
+ assertFalse(config.unused().contains("sasl.jaas.config"));
+
+ // prefix with mechanism overrides default
+ assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
+ assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
+ assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
+ assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
+ assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
+ assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
+
+ // prefix override for mechanism with no default
+ assertFalse(config.unused().contains("sasl.kerberos.service.name"));
+ assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+ assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
+ assertFalse(config.unused().contains("sasl.kerberos.service.name"));
+ assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
+ assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
+ // unset with no default
+ assertTrue(config.unused().contains("ssl.provider"));
+ assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider"));
+ assertTrue(config.unused().contains("ssl.provider"));
+ }
+
+ @Test
public void testValuesWithPrefixAllOrNothing() {
String prefix1 = "prefix1.";
String prefix2 = "prefix2.";
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 86d26d4..6072bf5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -26,9 +26,10 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SaslChannelBuilderTest {
@@ -37,20 +38,20 @@ public class SaslChannelBuilderTest {
public void testCloseBeforeConfigureIsIdempotent() {
SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
builder.close();
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
builder.close();
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
}
@Test
public void testCloseAfterConfigIsIdempotent() {
SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
builder.configure(new HashMap());
- assertNotNull(builder.loginManager());
+ assertNotNull(builder.loginManagers().get("PLAIN"));
builder.close();
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
builder.close();
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
}
@Test
@@ -61,17 +62,18 @@ public class SaslChannelBuilderTest {
builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1"));
fail("Exception should have been thrown");
} catch (KafkaException e) {
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
}
builder.close();
- assertNull(builder.loginManager());
+ assertTrue(builder.loginManagers().isEmpty());
}
private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
- return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
+ Map jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
+ return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
false, "PLAIN", true, null, null);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index 49d5a86..e8535d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -188,8 +188,8 @@ public class JaasContextTest {
"KafkaServer { test.LoginModuleDefault required; };",
"plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
));
- JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
- Collections.emptyMap());
+ JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
+ "SOME-MECHANISM", Collections.emptyMap());
assertEquals("plaintext.KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
@@ -203,8 +203,8 @@ public class JaasContextTest {
"KafkaServer { test.LoginModule required; };",
"other.KafkaServer { test.LoginModuleOther requisite; };"
));
- JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
- Collections.emptyMap());
+ JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
+ "SOME-MECHANISM", Collections.emptyMap());
assertEquals("KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
@@ -215,24 +215,13 @@ public class JaasContextTest {
@Test(expected = IllegalArgumentException.class)
public void testLoadForServerWithWrongListenerName() throws IOException {
writeConfiguration("Server", "test.LoginModule required;");
- JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
- Collections.emptyMap());
- }
-
- /**
- * ListenerName can only be used with Type.SERVER.
- */
- @Test(expected = IllegalArgumentException.class)
- public void testLoadForClientWithListenerName() {
- JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"),
+ JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM",
Collections.emptyMap());
}
private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
- Map configs = new HashMap<>();
- if (jaasConfigProp != null)
- configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
- JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs);
+ Password saslJaasConfig = jaasConfigProp == null ? null : new Password(jaasConfigProp);
+ JaasContext context = JaasContext.load(contextType, null, contextType.name(), saslJaasConfig);
List entries = context.configurationEntries();
assertEquals(1, entries.size());
return entries.get(0);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index e3d6b7a..ef2a075 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -715,7 +715,7 @@ public class SaslAuthenticatorTest {
* property override is used during authentication.
*/
@Test
- public void testDynamicJaasConfiguration() throws Exception {
+ public void testClientDynamicJaasConfiguration() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
@@ -756,6 +756,37 @@ public class SaslAuthenticatorTest {
}
}
+ /**
+ * Tests dynamic JAAS configuration property for SASL server. Invalid server credentials
+ * are set in the static JVM-wide configuration instance to ensure that the dynamic
+ * property override is used during authentication.
+ */
+ @Test
+ public void testServerDynamicJaasConfiguration() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
+ Map serverOptions = new HashMap<>();
+ serverOptions.put("user_user1", "user1-secret");
+ serverOptions.put("user_user2", "user2-secret");
+ saslServerConfigs.put("listener.name.sasl_ssl.plain." + SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions));
+ TestJaasConfig staticJaasConfig = new TestJaasConfig();
+ staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
+ Collections.emptyMap());
+ staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
+ Configuration.setConfiguration(staticJaasConfig);
+ server = createEchoServer(securityProtocol);
+
+ // Check that 'user1' can connect with static Jaas config
+ createAndCheckClientConnection(securityProtocol, "1");
+
+ // Check that user 'user2' can also connect with a Jaas config override
+ saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
+ createAndCheckClientConnection(securityProtocol, "2");
+ }
+
@Test
public void testJaasConfigurationForListener() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
@@ -986,18 +1017,19 @@ public class SaslAuthenticatorTest {
throws Exception {
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
final Map configs = Collections.emptyMap();
- final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName, configs);
+ final JaasContext jaasContext = JaasContext.loadServerContext(listenerName, saslMechanism, configs);
+ final Map jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
boolean isScram = ScramMechanism.isScram(saslMechanism);
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
- SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
+ SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map configs, String id,
- TransportLayer transportLayer, Subject subject) throws IOException {
- return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
+ TransportLayer transportLayer, Map subjects) throws IOException {
+ return new SaslServerAuthenticator(configs, id, jaasContexts, subjects, null,
credentialCache, listenerName, securityProtocol, transportLayer, null) {
@Override
@@ -1032,8 +1064,10 @@ public class SaslAuthenticatorTest {
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
final Map configs = Collections.emptyMap();
- final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
- SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
+ final JaasContext jaasContext = JaasContext.loadClientContext(configs);
+ final Map jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
+
+ SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true, null, null) {
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 51ea58e..72c2969 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -51,7 +51,7 @@ public class SaslServerAuthenticatorTest {
TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
- SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
+ SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
final Capture size = EasyMock.newCapture();
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer() {
@@ -72,7 +72,7 @@ public class SaslServerAuthenticatorTest {
TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
- SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
+ SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243);
final Struct headerStruct = header.toStruct();
@@ -106,12 +106,13 @@ public class SaslServerAuthenticatorTest {
}
}
- private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer) throws IOException {
+ private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, String mechanism) throws IOException {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap());
- JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
- Subject subject = new Subject();
- return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(),
+ Map jaasContexts = Collections.singletonMap(mechanism,
+ new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig));
+ Map subjects = Collections.singletonMap(mechanism, new Subject());
+ return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(),
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index 5336fd7..dafa79d 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -54,6 +54,20 @@ public class TestJaasConfig extends Configuration {
return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";");
}
+ public static Password jaasConfigProperty(String mechanism, Map options) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(loginModule(mechanism));
+ builder.append(" required");
+ for (Map.Entry option : options.entrySet()) {
+ builder.append(' ');
+ builder.append(option.getKey());
+ builder.append('=');
+ builder.append(option.getValue());
+ }
+ builder.append(';');
+ return new Password(builder.toString());
+ }
+
public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword) {
Map options = new HashMap<>();
if (clientUsername != null)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a8e1249..f36fc79 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -54,11 +54,13 @@ object KafkaController extends Logging {
}
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
+ @volatile private var brokerInfo = initialBrokerInfo
+
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
@@ -77,6 +79,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+ private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
@@ -274,6 +277,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+ unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
// reset topic deletion manager
topicDeletionManager.reset()
@@ -360,6 +364,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
+ registerBrokerModificationsHandler(newBrokers)
+ }
+
+ private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
+ debug(s"Register BrokerModifications handler for $brokerIds")
+ brokerIds.foreach { brokerId =>
+ val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId)
+ zkClient.registerZNodeChangeHandler(brokerModificationsHandler)
+ brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
+ }
+ }
+
+ private def unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
+ debug(s"Unregister BrokerModifications handler for $brokerIds")
+ brokerIds.foreach { brokerId =>
+ brokerModificationsHandlers.remove(brokerId).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
+ }
}
/*
@@ -374,6 +395,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
onReplicasBecomeOffline(allReplicasOnDeadBrokers)
+
+ unregisterBrokerModificationsHandler(deadBrokers)
+ }
+
+ private def onBrokerUpdate(updatedBrokers: Seq[Int]) {
+ info(s"Broker info update callback for ${updatedBrokers.mkString(",")}")
+ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
}
/**
@@ -613,6 +641,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
+ // register broker modifications handlers
+ registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
@@ -1209,6 +1239,29 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
+ case object BrokerModifications extends ControllerEvent {
+ override def state: ControllerState = ControllerState.BrokerChange
+
+ override def process(): Unit = {
+ if (!isActive) return
+ val curBrokers = zkClient.getAllBrokersInCluster.toSet
+ val updatedBrokers = controllerContext.liveBrokers.filter { broker =>
+ val existingBroker = curBrokers.find(_.id == broker.id)
+ existingBroker match {
+ case Some(b) => broker.endPoints != b.endPoints
+ case None => false
+ }
+ }
+ if (updatedBrokers.nonEmpty) {
+ val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted
+ info(s"Updated brokers: $updatedBrokers")
+
+ controllerContext.liveBrokers = curBrokers // Update broker metadata
+ onBrokerUpdate(updatedBrokerIdsSorted)
+ }
+ }
+ }
+
case object TopicChange extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
@@ -1458,7 +1511,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = BrokerIdsZNode.path
- override def handleChildChange(): Unit = eventManager.put(controller.BrokerChange)
+ override def handleChildChange(): Unit = {
+ eventManager.put(controller.BrokerChange)
+ }
+}
+
+class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
+ override val path: String = BrokerIdZNode.path(brokerId)
+
+ override def handleDataChange(): Unit = {
+ eventManager.put(controller.BrokerModifications)
+ }
}
class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index f6ec974..c11ebcb 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -54,7 +54,6 @@ import scala.util.control.ControlThrowable
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
- private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val maxQueuedRequests = config.queuedMaxRequests
private val maxConnectionsPerIp = config.maxConnectionsPerIp
@@ -72,7 +71,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private val processors = new ConcurrentHashMap[Int, Processor]()
private var nextProcessorId = 0
- private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
+ private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
private var stoppedProcessingRequests = false
@@ -82,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
- createProcessors(config.numNetworkThreads)
+ createProcessors(config.numNetworkThreads, config.listeners)
}
newGauge("NetworkProcessorAvgIdlePercent",
@@ -111,14 +110,17 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
info("Started " + acceptors.size + " acceptor threads")
}
- private def createProcessors(newProcessorsPerListener: Int): Unit = synchronized {
+ private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
+
+ private def createProcessors(newProcessorsPerListener: Int,
+ endpoints: Seq[EndPoint]): Unit = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
val numProcessorThreads = config.numNetworkThreads
- config.listeners.foreach { endpoint =>
+ endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
@@ -130,12 +132,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
listenerProcessors.foreach(p => processors.put(p.id, p))
- val acceptor = acceptors.getOrElseUpdate(endpoint, {
- val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
- KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
- acceptor.awaitStartup()
- acceptor
- })
+ val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
+ KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
+ acceptor.awaitStartup()
+ acceptors.put(endpoint, acceptor)
acceptor.addProcessors(listenerProcessors)
}
}
@@ -153,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def stopProcessingRequests() = {
info("Stopping socket server request processors")
this.synchronized {
- acceptors.values.foreach(_.shutdown)
+ acceptors.asScala.values.foreach(_.shutdown)
processors.asScala.values.foreach(_.shutdown)
requestChannel.clear()
stoppedProcessingRequests = true
@@ -161,12 +161,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
info("Stopped socket server request processors")
}
- def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = {
+ def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
if (newNumNetworkThreads > oldNumNetworkThreads)
- createProcessors(newNumNetworkThreads - oldNumNetworkThreads)
+ createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
else if (newNumNetworkThreads < oldNumNetworkThreads)
- acceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
+ acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
}
/**
@@ -185,9 +185,22 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def boundPort(listenerName: ListenerName): Int = {
try {
- acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort
+ acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
} catch {
- case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
+ case e: Exception =>
+ throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
+ }
+ }
+
+ def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+ info(s"Adding listeners for endpoints $listenersAdded")
+ createProcessors(config.numNetworkThreads, listenersAdded)
+ }
+
+ def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
+ info(s"Removing listeners for endpoints $listenersRemoved")
+ listenersRemoved.foreach { endpoint =>
+ acceptors.asScala.remove(endpoint).foreach(_.shutdown())
}
}
@@ -239,8 +252,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
- alive.set(false)
- wakeup()
+ if (alive.getAndSet(false))
+ wakeup()
shutdownLatch.await()
}
@@ -312,6 +325,11 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}
+ override def shutdown(): Unit = {
+ super.shutdown()
+ processors.foreach(_.shutdown())
+ }
+
/**
* Accept loop that checks for new connection attempts
*/
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 120e8f9..0e7ebb6 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -17,7 +17,7 @@
package kafka.security
-import java.util.{List, Properties}
+import java.util.{Collection, Properties}
import org.apache.kafka.common.security.authenticator.CredentialCache
import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
@@ -25,10 +25,10 @@ import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef._
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
-class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) {
+class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
val credentialCache = new CredentialCache
- ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
+ ScramCredentialUtils.createCache(credentialCache, scramMechanisms)
def updateCredentials(username: String, config: Properties) {
for (mechanism <- ScramMechanism.values()) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9c85f9b..a95de0a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -17,20 +17,22 @@
package kafka.server
-import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.network.ListenerReconfigurable
import org.apache.kafka.common.metrics.MetricsReporter
-import org.apache.kafka.common.utils.{Base64, Utils}
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
+import org.apache.kafka.common.security.authenticator.LoginManager
+import org.apache.kafka.common.utils.Utils
import scala.collection._
import scala.collection.JavaConverters._
@@ -71,11 +73,7 @@ import scala.collection.JavaConverters._
*/
object DynamicBrokerConfig {
- private val DynamicPasswordConfigs = Set(
- SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
- SslConfigs.SSL_KEY_PASSWORD_CONFIG
- )
- private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
+ private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
val AllDynamicConfigs = mutable.Set[String]()
AllDynamicConfigs ++= DynamicSecurityConfigs
@@ -83,11 +81,17 @@ object DynamicBrokerConfig {
AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
+ AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs
- private val PerBrokerConfigs = DynamicSecurityConfigs
+ private val PerBrokerConfigs = DynamicSecurityConfigs ++
+ DynamicListenerConfig.ReconfigurableConfigs
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
+ private[server] val DynamicPasswordConfigs = {
+ val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
+ AllDynamicConfigs.intersect(passwordConfigs)
+ }
def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
name match {
@@ -123,11 +127,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig = kafkaConfig
+ private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
- updateBrokerConfig(kafkaConfig.brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString))
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
+ val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
+ updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
}
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
@@ -136,6 +143,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+ addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@@ -187,22 +195,37 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
+ private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
+ secret.map { secret =>
+ new PasswordEncoder(secret,
+ kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
+ kafkaConfig.passwordEncoderCipherAlgorithm,
+ kafkaConfig.passwordEncoderKeyLength,
+ kafkaConfig.passwordEncoderIterations)
+ }
+ }
+
+ private def passwordEncoder: PasswordEncoder = {
+ dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
+ }
+
private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
val props = configProps.clone().asInstanceOf[Properties]
- // TODO (KAFKA-6246): encrypt passwords
+
def encodePassword(configName: String): Unit = {
val value = props.getProperty(configName)
if (value != null) {
if (!perBrokerConfig)
throw new ConfigException("Password config can be defined only at broker level")
- props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8)))
+ props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
DynamicPasswordConfigs.foreach(encodePassword)
props
}
- private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = {
+ private[server] def fromPersistentProps(persistentProps: Properties,
+ perBrokerConfig: Boolean): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
// Remove all invalid configs from `props`
@@ -219,17 +242,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
if (!perBrokerConfig)
removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
- // TODO (KAFKA-6246): encrypt passwords
def decodePassword(configName: String): Unit = {
val value = props.getProperty(configName)
if (value != null) {
- props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8))
+ try {
+ props.setProperty(configName, passwordEncoder.decode(value).value)
+ } catch {
+ case e: Exception =>
+ error(s"Dynamic password config $configName could not be decoded, ignoring.", e)
+ props.remove(configName)
+ }
}
}
+
DynamicPasswordConfigs.foreach(decodePassword)
props
}
+ // If the secret has changed, password.encoder.old.secret contains the old secret that was used
+ // to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
+ // encoded using the current secret. Ignore any errors during decoding since old secret may not
+ // have been removed during broker restart.
+ private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
+ val props = persistentProps.clone().asInstanceOf[Properties]
+ if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) {
+ maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
+ DynamicPasswordConfigs.foreach { configName =>
+ val value = props.getProperty(configName)
+ if (value != null) {
+ val decoded = try {
+ Some(passwordDecoder.decode(value).value)
+ } catch {
+ case _: Exception =>
+ debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
+ None
+ }
+ decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
+ }
+ }
+ adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props)
+ }
+ }
+ props
+ }
+
private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty)
@@ -331,14 +387,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
newProps ++= staticBrokerConfigs
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, dynamicBrokerConfigs)
- val newConfig = processReconfiguration(newProps, validateOnly = false)
+ val oldConfig = currentConfig
+ val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
currentConfig = newConfig
- kafkaConfig.updateCurrentConfig(currentConfig)
+ kafkaConfig.updateCurrentConfig(newConfig)
+
+ // Process BrokerReconfigurable updates after current config is updated
+ brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
}
- private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = {
+ private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = {
val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
if (updatedMap.nonEmpty) {
@@ -352,16 +412,22 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val updatedKeys = updatedConfigs(newValues, oldValues).keySet
if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
- processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly)
+ processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
- processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+ processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
}
+
+ // BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
+ val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.foreach { reconfigurable =>
- if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet))
- processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly)
+ if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) {
+ reconfigurable.validateReconfiguration(newConfig)
+ if (!validateOnly)
+ brokerReconfigurablesToUpdate += reconfigurable
+ }
}
- newConfig
+ (newConfig, brokerReconfigurablesToUpdate.toList)
} catch {
case e: Exception =>
if (!validateOnly)
@@ -370,7 +436,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
else
- currentConfig
+ (currentConfig, List.empty)
}
private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
@@ -378,27 +444,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
private def processReconfigurable(reconfigurable: Reconfigurable,
+ updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
val newConfigs = new util.HashMap[String, Object]
allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
newConfigs.putAll(newCustomConfigs)
- if (validateOnly) {
- if (!reconfigurable.validateReconfiguration(newConfigs))
- throw new ConfigException("Validation of dynamic config update failed")
- } else
- reconfigurable.reconfigure(newConfigs)
- }
+ try {
+ reconfigurable.validateReconfiguration(newConfigs)
+ } catch {
+ case e: ConfigException => throw e
+ case _: Exception =>
+ throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}")
+ }
- private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
- oldConfig: KafkaConfig,
- newConfig: KafkaConfig,
- validateOnly: Boolean): Unit = {
- if (validateOnly)
- reconfigurable.validateReconfiguration(newConfig)
- else
- reconfigurable.reconfigure(oldConfig, newConfig)
+ if (!validateOnly) {
+ info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames custom configs: $newCustomConfigs")
+ reconfigurable.reconfigure(newConfigs)
+ }
}
}
@@ -427,11 +491,10 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
DynamicLogConfig.ReconfigurableConfigs.asJava
}
- override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
// For update of topic config overrides, only config names and types are validated
// Names and types have already been validated. For consistency with topic config
// validation, no additional validation is performed.
- true
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
@@ -538,7 +601,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
configs
}
- override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val updatedMetricsReporters = metricsReporterClasses(configs)
// Ensure all the reporter classes can be loaded and have a default constructor
@@ -548,10 +611,11 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
}
// Validate the new configuration using every reconfigurable reporter instance that is not being deleted
- currentReporters.values.forall {
+ currentReporters.values.foreach {
case reporter: Reconfigurable =>
- !updatedMetricsReporters.contains(reporter.getClass.getName) || reporter.validateReconfiguration(configs)
- case _ => true
+ if (updatedMetricsReporters.contains(reporter.getClass.getName))
+ reporter.validateReconfiguration(configs)
+ case _ =>
}
}
@@ -588,3 +652,103 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
}
}
+object DynamicListenerConfig {
+
+ val ReconfigurableConfigs = Set(
+ // Listener configs
+ KafkaConfig.AdvertisedListenersProp,
+ KafkaConfig.ListenersProp,
+ KafkaConfig.ListenerSecurityProtocolMapProp,
+
+ // SSL configs
+ KafkaConfig.PrincipalBuilderClassProp,
+ KafkaConfig.SslProtocolProp,
+ KafkaConfig.SslProviderProp,
+ KafkaConfig.SslCipherSuitesProp,
+ KafkaConfig.SslEnabledProtocolsProp,
+ KafkaConfig.SslKeystoreTypeProp,
+ KafkaConfig.SslKeystoreLocationProp,
+ KafkaConfig.SslKeystorePasswordProp,
+ KafkaConfig.SslKeyPasswordProp,
+ KafkaConfig.SslTruststoreTypeProp,
+ KafkaConfig.SslTruststoreLocationProp,
+ KafkaConfig.SslTruststorePasswordProp,
+ KafkaConfig.SslKeyManagerAlgorithmProp,
+ KafkaConfig.SslTrustManagerAlgorithmProp,
+ KafkaConfig.SslEndpointIdentificationAlgorithmProp,
+ KafkaConfig.SslSecureRandomImplementationProp,
+ KafkaConfig.SslClientAuthProp,
+
+ // SASL configs
+ KafkaConfig.SaslMechanismInterBrokerProtocolProp,
+ KafkaConfig.SaslJaasConfigProp,
+ KafkaConfig.SaslEnabledMechanismsProp,
+ KafkaConfig.SaslKerberosServiceNameProp,
+ KafkaConfig.SaslKerberosKinitCmdProp,
+ KafkaConfig.SaslKerberosTicketRenewWindowFactorProp,
+ KafkaConfig.SaslKerberosTicketRenewJitterProp,
+ KafkaConfig.SaslKerberosMinTimeBeforeReloginProp,
+ KafkaConfig.SaslKerberosPrincipalToLocalRulesProp
+ )
+}
+
+class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging {
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicListenerConfig.ReconfigurableConfigs
+ }
+
+ def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+
+ def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
+ newConfig.originals.asScala
+ .filterKeys(_.startsWith(prefix))
+ .filterKeys(k => !DynamicSecurityConfigs.contains(k))
+ }
+
+ val oldConfig = server.config
+ val newListeners = listenersToMap(newConfig.listeners)
+ val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners)
+ val oldListeners = listenersToMap(oldConfig.listeners)
+ if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
+ throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
+ if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
+ throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
+ newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
+ val prefix = listenerName.configPrefix
+ val newListenerProps = immutableListenerConfigs(newConfig, prefix)
+ val oldListenerProps = immutableListenerConfigs(oldConfig, prefix)
+ if (newListenerProps != oldListenerProps)
+ throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
+ "restart broker or create a new listener for update")
+ if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName))
+ throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
+ }
+ if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
+ throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
+ }
+
+ def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+ val newListeners = newConfig.listeners
+ val newListenerMap = listenersToMap(newListeners)
+ val oldListeners = oldConfig.listeners
+ val oldListenerMap = listenersToMap(oldListeners)
+ val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
+ val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
+
+ // Clear SASL login cache to force re-login
+ if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
+ LoginManager.closeAll()
+
+ server.socketServer.removeListeners(listenersRemoved)
+ if (listenersAdded.nonEmpty)
+ server.socketServer.addListeners(listenersAdded)
+
+ server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo)
+ }
+
+ private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
+ listeners.map(e => (e.listenerName, e)).toMap
+
+}
+
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2dd6951..1f448af 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1291,7 +1291,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
- sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
+ sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()))
}
def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 144dd65..64698f7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -223,6 +223,11 @@ object Defaults {
val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L
val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L
+
+ /** ********* Password encryption configuration for dynamic configs *********/
+ val PasswordEncoderCipherAlgorithm = "AES/CBC/PKCS5Padding"
+ val PasswordEncoderKeyLength = 128
+ val PasswordEncoderIterations = 4096
}
object KafkaConfig {
@@ -412,6 +417,7 @@ object KafkaConfig {
/** ********* SASL Configuration ****************/
val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
+ val SaslJaasConfigProp = SaslConfigs.SASL_JAAS_CONFIG
val SaslEnabledMechanismsProp = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG
val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
@@ -426,6 +432,14 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
+ /** ********* Password encryption configuration for dynamic configs *********/
+ val PasswordEncoderSecretProp = "password.encoder.secret"
+ val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
+ val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
+ val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
+ val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
+ val PasswordEncoderIterationsProp = "password.encoder.iterations"
+
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Zookeeper host string"
@@ -689,6 +703,7 @@ object KafkaConfig {
/** ********* Sasl Configuration ****************/
val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
+ val SaslJaasConfigDoc = SaslConfigs.SASL_JAAS_CONFIG_DOC
val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC
val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC
@@ -704,6 +719,16 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day."
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
+ /** ********* Password encryption configuration for dynamic configs *********/
+ val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
+ val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
+ "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
+ s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
+ val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
+ "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
+ val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
+ val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
+ val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
private val configDef = {
import ConfigDef.Importance._
@@ -898,6 +923,7 @@ object KafkaConfig {
/** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)
+ .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
.define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc)
.define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc)
.define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc)
@@ -911,6 +937,13 @@ object KafkaConfig {
.define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
+ /** ********* Password encryption configuration for dynamic configs *********/
+ .define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc)
+ .define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc)
+ .define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc)
+ .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
+ .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
+ .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
}
def configNames() = configDef.names().asScala.toList.sorted
@@ -942,9 +975,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def this(props: java.util.Map[_, _]) = this(props, true, None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
- private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
+ private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
this.currentConfig = newConfig
@@ -1076,8 +1109,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
- val (interBrokerListenerName, interBrokerSecurityProtocol) = getInterBrokerListenerNameAndSecurityProtocol
-
// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
@@ -1120,32 +1151,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
- /** ********* SSL Configuration **************/
- val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
- val sslProtocol = getString(KafkaConfig.SslProtocolProp)
- val sslProvider = getString(KafkaConfig.SslProviderProp)
- val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
- def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
- def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
- def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
- def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
- val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
- val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
- val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
- val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
- val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
- val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)
- val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
-
- /** ********* Sasl Configuration **************/
- val saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
- val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp)
- val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)
- val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp)
- val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
- val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
- val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
- val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
+ /** ********* SSL/SASL Configuration **************/
+ // Security configs may be overridden for listeners, so it is not safe to use the base values
+ // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
+ // retrieved using KafkaConfig#valuesWithPrefixOverride
+ private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = {
+ val value = valuesWithPrefixOverride(listenerName.configPrefix).get(KafkaConfig.SaslEnabledMechanismsProp)
+ if (value != null)
+ value.asInstanceOf[util.List[String]].asScala.toSet
+ else
+ Set.empty[String]
+ }
+
+ def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
+ def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
+ def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
/** ********* DelegationToken Configuration **************/
@@ -1155,6 +1175,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
+ /** ********* Password encryption configuration for dynamic configs *********/
+ def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
+ def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
+ def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
+ def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
+ def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
+ def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
+
/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@@ -1170,9 +1198,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
def compressionType = getString(KafkaConfig.CompressionTypeProp)
- val listeners: Seq[EndPoint] = getListeners
- val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
- private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap
def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
@@ -1203,7 +1228,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// If the user did not define listeners but did define host or port, let's use them in backward compatible way
// If none of those are defined, we default to PLAINTEXT://:9092
- private def getListeners: Seq[EndPoint] = {
+ def listeners: Seq[EndPoint] = {
Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
@@ -1212,14 +1237,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// If the user defined advertised listeners, we use those
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
- private def getAdvertisedListeners: Seq[EndPoint] = {
+ def advertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
if (advertisedListenersProp != null)
CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap)
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap)
else
- getListeners
+ listeners
}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
@@ -1248,7 +1273,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}
- private def getListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
+ def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
.map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
@@ -1295,7 +1320,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
- require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
+ require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol),
s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 747a0df..0212181 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
@@ -236,8 +237,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
- tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms)
- credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache)
+ // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
+ // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
+ tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+ credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
@@ -366,7 +369,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
- private def createBrokerInfo: BrokerInfo = {
+ private[server] def createBrokerInfo: BrokerInfo = {
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
new file mode 100644
index 0000000..ff11e24
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.utils
+
+import java.nio.charset.StandardCharsets
+import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
+import java.security.spec.AlgorithmParameterSpec
+import javax.crypto.{Cipher, SecretKeyFactory}
+import javax.crypto.spec._
+
+import kafka.utils.PasswordEncoder._
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.utils.Base64
+
+import scala.collection.Map
+
+object PasswordEncoder {
+ val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
+ val CipherAlgorithmProp = "cipherAlgorithm"
+ val InitializationVectorProp = "initializationVector"
+ val KeyLengthProp = "keyLength"
+ val SaltProp = "salt"
+ val IterationsProp = "iterations"
+ val EncyrptedPasswordProp = "encryptedPassword"
+ val PasswordLengthProp = "passwordLength"
+}
+
+/**
+ * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
+ * containing the encoded password in base64 and along with the properties used for encryption.
+ *
+ * @param secret The secret used for encoding and decoding
+ * @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
+ * used if available, PBKDF2WithHmacSHA1 otherwise.
+ * @param cipherAlgorithm Cipher algorithm used for encoding.
+ * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
+ * @param iterations Iteration count used for encoding.
+ *
+ * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
+ * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
+ *
+ */
+class PasswordEncoder(secret: Password,
+ keyFactoryAlgorithm: Option[String],
+ cipherAlgorithm: String,
+ keyLength: Int,
+ iterations: Int) extends Logging {
+
+ private val secureRandom = new SecureRandom
+ private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
+
+ def encode(password: Password): String = {
+ val salt = new Array[Byte](256)
+ secureRandom.nextBytes(salt)
+ val cipher = Cipher.getInstance(cipherAlgorithm)
+ val keyFactory = secretKeyFactory(keyFactoryAlgorithm)
+ val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations)
+ cipher.init(Cipher.ENCRYPT_MODE, keySpec)
+ val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8))
+ val encryptedMap = Map(
+ KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm,
+ CipherAlgorithmProp -> cipherAlgorithm,
+ KeyLengthProp -> keyLength,
+ SaltProp -> base64Encode(salt),
+ IterationsProp -> iterations.toString,
+ EncyrptedPasswordProp -> base64Encode(encryptedPassword),
+ PasswordLengthProp -> password.value.length
+ ) ++ cipherParamsEncoder.toMap(cipher.getParameters)
+ encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
+ }
+
+ def decode(encodedPassword: String): Password = {
+ val params = CoreUtils.parseCsvMap(encodedPassword)
+ val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
+ val cipherAlg = params(CipherAlgorithmProp)
+ val keyLength = params(KeyLengthProp).toInt
+ val salt = base64Decode(params(SaltProp))
+ val iterations = params(IterationsProp).toInt
+ val encryptedPassword = base64Decode(params(EncyrptedPasswordProp))
+ val passwordLengthProp = params(PasswordLengthProp).toInt
+ val cipher = Cipher.getInstance(cipherAlg)
+ val keyFactory = secretKeyFactory(Some(keyFactoryAlg))
+ val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations)
+ cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params))
+ val password = try {
+ val decrypted = cipher.doFinal(encryptedPassword)
+ new String(decrypted, StandardCharsets.UTF_8)
+ } catch {
+ case e: Exception => throw new ConfigException("Password could not be decoded", e)
+ }
+ if (password.length != passwordLengthProp) // Sanity check
+ throw new ConfigException("Password could not be decoded, sanity check of length failed")
+ new Password(password)
+ }
+
+ private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = {
+ keyFactoryAlg match {
+ case Some(algorithm) => SecretKeyFactory.getInstance(algorithm)
+ case None =>
+ try {
+ SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
+ } catch {
+ case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1")
+ }
+ }
+ }
+
+ private def secretKeySpec(keyFactory: SecretKeyFactory,
+ cipherAlg: String,
+ keyLength: Int,
+ salt: Array[Byte], iterations: Int): SecretKeySpec = {
+ val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength)
+ val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg
+ new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
+ }
+
+ private def base64Encode(bytes: Array[Byte]): String = Base64.encoder.encodeToString(bytes)
+
+ private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.decoder.decode(encoded)
+
+ private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
+ val aesPattern = "AES/(.*)/.*".r
+ cipherAlgorithm match {
+ case aesPattern("GCM") => new GcmParamsEncoder
+ case _ => new IvParamsEncoder
+ }
+ }
+
+ private trait CipherParamsEncoder {
+ def toMap(cipher: AlgorithmParameters): Map[String, String]
+ def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec
+ }
+
+ private class IvParamsEncoder extends CipherParamsEncoder {
+ def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
+ if (cipherParams != null) {
+ val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec])
+ Map(InitializationVectorProp -> base64Encode(ivSpec.getIV))
+ } else
+ throw new IllegalStateException("Could not determine initialization vector for cipher")
+ }
+ def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
+ new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp)))
+ }
+ }
+
+ private class GcmParamsEncoder extends CipherParamsEncoder {
+ def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
+ if (cipherParams != null) {
+ val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec])
+ Map(InitializationVectorProp -> base64Encode(spec.getIV),
+ "authenticationTagLength" -> spec.getTLen.toString)
+ } else
+ throw new IllegalStateException("Could not determine initialization vector for cipher")
+ }
+ def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
+ new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp)))
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d683a8d..b545455 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -82,6 +82,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
}
+ def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
+ val brokerIdPath = brokerInfo.path
+ val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
+ retryRequestUntilConnected(setDataRequest)
+ info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
+ }
+
/**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want ot get states.
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 6f45bca..867e03d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -430,13 +430,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
assertFalse(listenerSecurityProtocolMap.isDefault)
assertFalse(listenerSecurityProtocolMap.isSensitive)
- assertTrue(listenerSecurityProtocolMap.isReadOnly)
+ assertFalse(listenerSecurityProtocolMap.isReadOnly)
val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
assertNull(truststorePassword.value)
assertFalse(truststorePassword.isDefault)
assertTrue(truststorePassword.isSensitive)
- assertTrue(truststorePassword.isReadOnly)
+ assertFalse(truststorePassword.isReadOnly)
val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
assertEquals(servers(1).config.compressionType.toString, compressionType.value)
assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index f459252..273b247 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -59,12 +59,7 @@ trait SaslSetup {
case _ => false
})
if (hasKerberos) {
- val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
- kdc = new MiniKdc(kdcConf, workDir)
- kdc.start()
- kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
- kdc.createPrincipal(clientKeytabFile,
- JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+ initializeKerberos()
}
writeJaasConfigurationToFile(jaasSections)
val hasZk = jaasSections.exists(_.modules.exists {
@@ -75,6 +70,15 @@ trait SaslSetup {
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}
+ protected def initializeKerberos(): Unit = {
+ val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
+ kdc = new MiniKdc(kdcConf, workDir)
+ kdc.start()
+ kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
+ kdc.createPrincipal(clientKeytabFile,
+ JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+ }
+
/** Return a tuple with the path to the server keytab file and client keytab file */
protected def maybeCreateEmptyKeytabFiles(): (File, File) = {
if (serverKeytabFile.isEmpty)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1224274..b7f0ae8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,28 +18,29 @@
package kafka.server
-import java.io.{Closeable, File, FileOutputStream, FileWriter}
+import java.io.{Closeable, File, FileWriter}
import java.nio.file.{Files, StandardCopyOption}
import java.lang.management.ManagementFactory
import java.util
import java.util.{Collections, Properties}
-import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
+import java.util.concurrent._
import javax.management.ObjectName
import kafka.admin.ConfigCommand
-import kafka.api.SaslSetup
-import kafka.log.LogConfig
+import kafka.api.{KafkaSasl, SaslSetup}
import kafka.coordinator.group.OffsetConfig
+import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
-import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource, SslConfigs}
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
@@ -55,6 +56,7 @@ import org.junit.{After, Before, Test}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
+import scala.collection.Seq
object DynamicBrokerReconfigurationTest {
val SecureInternal = "INTERNAL"
@@ -65,12 +67,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
import DynamicBrokerReconfigurationTest._
- private var servers = new ArrayBuffer[KafkaServer]
+ private val servers = new ArrayBuffer[KafkaServer]
private val numServers = 3
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
private val adminClients = new ArrayBuffer[AdminClient]()
private val clientThreads = new ArrayBuffer[ShutdownableThread]()
+ private val executors = new ArrayBuffer[ExecutorService]
private val topic = "testtopic"
private val kafkaClientSaslMechanism = "PLAIN"
@@ -95,10 +98,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
+ props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000")
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
+ props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
+ props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret")
props ++= sslProperties1
addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -127,8 +133,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
clientThreads.foreach(_.interrupt())
clientThreads.foreach(_.initiateShutdown())
clientThreads.foreach(_.join(5 * 1000))
- producers.foreach(_.close())
- consumers.foreach(_.close())
+ executors.foreach(_.shutdownNow())
+ producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+ consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
adminClients.foreach(_.close())
TestUtils.shutdownServers(servers)
super.tearDown()
@@ -514,13 +521,217 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
+ @Test
+ def testAdvertisedListenerUpdate(): Unit = {
+ val adminClient = adminClients.head
+ val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
+
+ // Ensure connections are made to brokers before external listener is made inaccessible
+ describeConfig(externalAdminClient)
+
+ // Update broker keystore for external listener to use invalid listener address
+ // any address other than localhost is sufficient to fail (either connection or host name verification failure)
+ val invalidHost = "192.168.0.1"
+ alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
+
+ // Verify that producer connections fail since advertised listener is invalid
+ val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
+ val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap)
+
+ val sendFuture = verifyConnectionFailure(producer1)
+
+ alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
+
+ // Verify that produce/consume work now
+ val producer = createProducer(trustStoreFile1, retries = 0)
+ val consumer = createConsumer("group2", trustStoreFile1, topic)
+ verifyProduceConsume(producer, consumer, 10, topic)
+
+ // Verify updating inter-broker listener
+ val props = new Properties
+ props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
+ try {
+ reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
+ fail("Inter-broker listener cannot be dynamically updated")
+ } catch {
+ case e: ExecutionException =>
+ assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException])
+ servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value))
+ }
+
+ // Verify that the other send did not complete
+ verifyTimeout(sendFuture)
+ }
+
+ @Test
+ def testAddRemoveSslListener(): Unit = {
+ verifyAddListener("SSL", SecurityProtocol.SSL, Seq.empty)
+
+ // Restart servers and check secret rotation
+ servers.foreach(_.shutdown())
+ servers.foreach(_.awaitShutdown())
+ adminClients.foreach(_.close())
+ adminClients.clear()
+
+ // All passwords are currently encoded with password.encoder.secret. Encode with password.encoder.old.secret
+ // and update ZK. When each server is started, it should decode using password.encoder.old.secret and update
+ // ZK with newly encoded values using password.encoder.secret.
+ servers.foreach { server =>
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+ val config = server.config
+ val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
+ val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured"))
+ val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains)
+ val passwordDecoder = new PasswordEncoder(secret,
+ config.passwordEncoderKeyFactoryAlgorithm,
+ config.passwordEncoderCipherAlgorithm,
+ config.passwordEncoderKeyLength,
+ config.passwordEncoderIterations)
+ val passwordEncoder = new PasswordEncoder(oldSecret,
+ config.passwordEncoderKeyFactoryAlgorithm,
+ config.passwordEncoderCipherAlgorithm,
+ config.passwordEncoderKeyLength,
+ config.passwordEncoderIterations)
+ passwordConfigs.foreach { case (name, value) =>
+ val decoded = passwordDecoder.decode(value).value
+ props.put(name, passwordEncoder.encode(new Password(decoded)))
+ }
+ val brokerId = server.config.brokerId
+ adminZkClient.changeBrokerConfig(Seq(brokerId), props)
+ val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+ passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) }
+
+ server.startup()
+ TestUtils.retry(10000) {
+ val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+ passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) }
+ }
+ }
+
+ verifyListener(SecurityProtocol.SSL, None)
+ createAdminClient(SecurityProtocol.SSL, SecureInternal)
+ verifyRemoveListener("SSL", SecurityProtocol.SSL, Seq.empty)
+ }
+
+ @Test
+ def testAddRemoveSaslListeners(): Unit = {
+ createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+ createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+ initializeKerberos()
+
+ //verifyAddListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
+ verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+ //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
+ verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+ }
+
+ private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanisms: Seq[String]): Unit = {
+ val config = servers.head.config
+ val existingListenerCount = config.listeners.size
+ val listeners = config.listeners
+ .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
+ .mkString(",") + s",$listenerName://localhost:0"
+ val listenerMap = config.listenerSecurityProtocolMap
+ .map { case (name, protocol) => s"${name.value}:${protocol.name}" }
+ .mkString(",") + s",$listenerName:${securityProtocol.name}"
+
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
+ securityProtocol match {
+ case SecurityProtocol.SSL =>
+ addListenerPropsSsl(listenerName, props)
+ case SecurityProtocol.SASL_PLAINTEXT =>
+ addListenerPropsSasl(listenerName, saslMechanisms, props)
+ case SecurityProtocol.SASL_SSL =>
+ addListenerPropsSasl(listenerName, saslMechanisms, props)
+ addListenerPropsSsl(listenerName, props)
+ case SecurityProtocol.PLAINTEXT => // no additional props
+ }
+
+ alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+
+ TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
+ "Listener config not updated")
+ TestUtils.waitUntilTrue(() => servers.forall(server => {
+ try {
+ server.socketServer.boundPort(new ListenerName(listenerName)) > 0
+ } catch {
+ case _: Exception => false
+ }
+ }), "Listener not created")
+
+ if (saslMechanisms.nonEmpty)
+ saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
+ else
+ verifyListener(securityProtocol, None)
+ }
+
+ private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanisms: Seq[String]): Unit = {
+ val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
+ val producer1 = createProducer(listenerName, securityProtocol, saslMechanism)
+ val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism,
+ s"remove-listener-group-$securityProtocol")
+ verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
+ // send another message to check consumer later
+ producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
+
+ val config = servers.head.config
+ val existingListenerCount = config.listeners.size
+ val listeners = config.listeners
+ .filter(e => e.listenerName.value != securityProtocol.name)
+ .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
+ .mkString(",")
+ val listenerMap = config.listenerSecurityProtocolMap
+ .filterKeys(listenerName => listenerName.value != securityProtocol.name)
+ .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
+ .mkString(",")
+
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+ val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix))
+ listenerProps.foreach(props.remove)
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
+ alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+
+ TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
+ "Listeners not updated")
+
+ // Test that connections using deleted listener don't work
+ val producerFuture = verifyConnectionFailure(producer1)
+ val consumerFuture = verifyConnectionFailure(consumer1)
+
+ // Test that other listeners still work
+ val producer2 = createProducer(trustStoreFile1, retries = 0)
+ val consumer2 = createConsumer(s"remove-listener-group2-$securityProtocol", trustStoreFile1, topic, autoOffsetReset = "latest")
+ verifyProduceConsume(producer2, consumer2, numRecords = 10, topic)
+
+ // Verify that producer/consumer using old listener don't work
+ verifyTimeout(producerFuture)
+ verifyTimeout(consumerFuture)
+ }
+
+ private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]): Unit = {
+ val mechanism = saslMechanism.getOrElse("")
+ val producer = createProducer(securityProtocol.name, securityProtocol, mechanism)
+ val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism,
+ s"add-listener-group-$securityProtocol-$mechanism")
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ }
+
+ private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+
private def createProducer(trustStore: File, retries: Int,
- clientId: String = "test-producer"): KafkaProducer[String, String] = {
- val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+ clientId: String = "test-producer",
+ bootstrap: String = bootstrapServers,
+ securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL): KafkaProducer[String, String] = {
val propsOverride = new Properties
propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
+ propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
val producer = TestUtils.createNewProducer(
- bootstrapServers,
+ bootstrap,
acks = -1,
retries = retries,
securityProtocol = SecurityProtocol.SASL_SSL,
@@ -533,17 +744,72 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
producer
}
- private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = {
- val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+ private def createConsumer(groupId: String, trustStore: File,
+ topic: String = topic,
+ bootstrap: String = bootstrapServers,
+ securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL,
+ autoOffsetReset: String = "earliest"):KafkaConsumer[String, String] = {
+ val propsOverride = new Properties
+ propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
val consumer = TestUtils.createNewConsumer(
- bootstrapServers,
+ bootstrap,
groupId,
- securityProtocol = SecurityProtocol.SASL_SSL,
+ autoOffsetReset = autoOffsetReset,
+ securityProtocol = securityProtocol,
trustStoreFile = Some(trustStore),
saslProperties = Some(clientSaslProps),
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer)
consumer.subscribe(Collections.singleton(topic))
+ if (autoOffsetReset == "latest") {
+ do {
+ consumer.poll(1)
+ } while (consumer.assignment.isEmpty)
+ }
+ consumers += consumer
+ consumer
+ }
+
+ private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: String): Properties = {
+ val props = new Properties
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
+ val saslProps = if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
+ Some(kafkaClientSaslProperties(saslMechanism, dynamicJaasConfig = true))
+ } else
+ None
+ val securityProps = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol,
+ Some(trustStoreFile1), "client", TestUtils.SslCertificateCn, saslProps)
+ props ++= securityProps
+ props
+ }
+
+ private def createProducer(listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanism: String): KafkaProducer[String, String] = {
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+ val producer = TestUtils.createNewProducer(bootstrapServers,
+ acks = -1, retries = 0,
+ securityProtocol = securityProtocol,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer,
+ props = Some(clientProps(securityProtocol, saslMechanism)))
+ producers += producer
+ producer
+ }
+
+ private def createConsumer(listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanism: String, group: String): KafkaConsumer[String, String] = {
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+ val consumer = TestUtils.createNewConsumer(bootstrapServers, group,
+ autoOffsetReset = "latest",
+ securityProtocol = securityProtocol,
+ keyDeserializer = new StringDeserializer,
+ valueDeserializer = new StringDeserializer,
+ props = Some(clientProps(securityProtocol, saslMechanism)))
+ consumer.subscribe(Collections.singleton(topic))
+ do {
+ consumer.poll(1)
+ } while (consumer.assignment.isEmpty)
consumers += consumer
consumer
}
@@ -552,6 +818,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val config = new util.HashMap[String, Object]
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+ config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps))
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
@@ -563,7 +830,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def verifyProduceConsume(producer: KafkaProducer[String, String],
consumer: KafkaConsumer[String, String],
numRecords: Int,
- topic: String = topic): Unit = {
+ topic: String): Unit = {
val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
@@ -639,6 +906,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
}
+ private def serverEndpoints(adminClient: AdminClient): String = {
+ val nodes = adminClient.describeCluster().nodes().get
+ nodes.asScala.map { node =>
+ s"${node.host}:${node.port}"
+ }.mkString(",")
+ }
+
+ private def alterAdvertisedListener(adminClient: AdminClient, externalAdminClient: AdminClient, oldHost: String, newHost: String): Unit = {
+ val configs = servers.map { server =>
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+ val newListeners = server.config.advertisedListeners.map { e =>
+ if (e.listenerName.value == SecureExternal)
+ s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
+ else
+ s"${e.listenerName.value}://${e.host}:${server.boundPort(e.listenerName)}"
+ }.mkString(",")
+ val configEntry = new ConfigEntry(KafkaConfig.AdvertisedListenersProp, newListeners)
+ (resource, new Config(Collections.singleton(configEntry)))
+ }.toMap.asJava
+ adminClient.alterConfigs(configs).all.get
+ servers.foreach { server =>
+ TestUtils.retry(10000) {
+ val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal)
+ .getOrElse(throw new IllegalStateException("External listener not found"))
+ assertTrue("Config not updated", externalListener.host == newHost)
+ }
+ }
+ val (endpoints, altered) = TestUtils.computeUntilTrue(serverEndpoints(externalAdminClient)) { endpoints =>
+ !endpoints.contains(oldHost)
+ }
+ assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
+ }
+
private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@@ -701,12 +1001,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
adminZkClient.changeBrokerConfig(brokers, keystoreProps)
}
- private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = {
- waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG,
- sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs)
-
- }
-
private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
}
@@ -774,6 +1068,64 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
+ private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = {
+ val executor = Executors.newSingleThreadExecutor
+ executors += executor
+ val future = executor.submit(new Runnable() {
+ def run() {
+ producer.send(new ProducerRecord(topic, "key", "value")).get
+ }
+ })
+ verifyTimeout(future)
+ future
+ }
+
+ private def verifyConnectionFailure(consumer: KafkaConsumer[String, String]): Future[_] = {
+ val executor = Executors.newSingleThreadExecutor
+ executors += executor
+ val future = executor.submit(new Runnable() {
+ def run() {
+ assertEquals(0, consumer.poll(100).count)
+ }
+ })
+ verifyTimeout(future)
+ future
+ }
+
+ private def verifyTimeout(future: Future[_]): Unit = {
+ try {
+ future.get(100, TimeUnit.MILLISECONDS)
+ fail("Operation should not have completed")
+ } catch {
+ case _: TimeoutException => // expected exception
+ }
+ }
+
+ private def addListenerPropsSsl(listenerName: String, props: Properties): Unit = {
+ val prefix = new ListenerName(listenerName).configPrefix
+ sslProperties1.keySet.asScala.foreach { name =>
+ val value = sslProperties1.get(name)
+ val valueStr = value match {
+ case password: Password => password.value
+ case list: util.List[_] => list.asScala.map(_.toString).mkString(",")
+ case _ => value.toString
+ }
+ props.put(s"$prefix$name", valueStr)
+ }
+ }
+
+ private def addListenerPropsSasl(listener: String, mechanisms: Seq[String], props: Properties): Unit = {
+ val listenerName = new ListenerName(listener)
+ val prefix = listenerName.configPrefix
+ props.put(prefix + KafkaConfig.SaslEnabledMechanismsProp, mechanisms.mkString(","))
+ props.put(prefix + KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+ mechanisms.foreach { mechanism =>
+ val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head
+ val jaasConfig = jaasSection.modules.head.toString
+ props.put(listenerName.saslMechanismConfigPrefix(mechanism) + KafkaConfig.SaslJaasConfigProp, jaasConfig)
+ }
+ }
+
private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) {
private val producer = createProducer(trustStoreFile1, retries, clientId)
@volatile var sent = 0
@@ -877,8 +1229,10 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
Set(PollingIntervalProp).asJava
}
- override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
- configs.get(PollingIntervalProp).toString.toInt > 0
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+ val pollingInterval = configs.get(PollingIntervalProp).toString
+ if (configs.get(PollingIntervalProp).toString.toInt <= 0)
+ throw new ConfigException(s"Invalid polling interval $pollingInterval")
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
index 666ba4b..f2c6753 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -21,25 +21,23 @@ import java.util.Properties
import kafka.utils.JaasTestUtils
import kafka.utils.JaasTestUtils.JaasSection
-import org.apache.kafka.common.network.ListenerName
class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
import MultipleListenersWithSameSecurityProtocolBaseTest._
- override def saslProperties(listenerName: ListenerName): Properties = {
- listenerName.value match {
- case SecureInternal => kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
- case SecureExternal => kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
- case _ => throw new IllegalArgumentException(s"Unexpected listener name $listenerName")
- }
+ override def staticJaasSections: Seq[JaasSection] = {
+ val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
+ JaasTestUtils.zkSections :+
+ JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile))
}
- override def jaasSections: Seq[JaasSection] = {
- val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
- JaasTestUtils.zkSections ++ Seq(
- JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", Seq(GssApi), Some(serverKeytabFile)),
- JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(Plain), None)
- )
+ override protected def dynamicJaasSections: Properties = {
+ val props = new Properties
+ kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism =>
+ addDynamicJaasSection(props, SecureInternal, mechanism,
+ JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None))
+ }
+ props
}
}
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
index f3e1b8b..10df84e 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -26,12 +26,9 @@ import org.apache.kafka.common.network.ListenerName
class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
- import MultipleListenersWithSameSecurityProtocolBaseTest._
+ override def staticJaasSections: Seq[JaasSection] =
+ jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both)
- override def saslProperties(listenerName: ListenerName): Properties =
- kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
-
- override def jaasSections: Seq[JaasSection] =
- jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both)
+ override protected def dynamicJaasSections: Properties = new Properties
}
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index da8437e..50ebaa4 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -19,13 +19,13 @@
package kafka.server
import java.io.File
-import java.util.{Collections, Properties}
+import java.util.{Collections, Objects, Properties}
import java.util.concurrent.TimeUnit
import kafka.api.SaslSetup
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
-import kafka.utils.TestUtils
+import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
@@ -55,18 +55,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
private val trustStoreFile = File.createTempFile("truststore", ".jks")
private val servers = new ArrayBuffer[KafkaServer]
- private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
- private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
+ private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]()
+ private val consumers = mutable.Map[ClientMetadata, KafkaConsumer[Array[Byte], Array[Byte]]]()
protected val kafkaClientSaslMechanism = Plain
- protected val kafkaServerSaslMechanisms = List(GssApi, Plain)
+ protected val kafkaServerSaslMechanisms = Map(
+ SecureExternal -> Seq("SCRAM-SHA-256", GssApi),
+ SecureInternal -> Seq(Plain, "SCRAM-SHA-512"))
- protected def saslProperties(listenerName: ListenerName): Properties
- protected def jaasSections: Seq[JaasSection]
+ protected def staticJaasSections: Seq[JaasSection]
+ protected def dynamicJaasSections: Properties
@Before
override def setUp(): Unit = {
- startSasl(jaasSections)
+ startSasl(staticJaasSections)
super.setUp()
// 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
val numServers = 2
@@ -82,8 +84,12 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
- props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
+ props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
+ kafkaServerSaslMechanisms(SecureInternal).mkString(","))
+ props.put(s"${new ListenerName(SecureExternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
+ kafkaServerSaslMechanisms(SecureExternal).mkString(","))
props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+ props ++= dynamicJaasSections
props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
@@ -107,26 +113,37 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
+ createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+
servers.head.config.listeners.foreach { endPoint =>
val listenerName = endPoint.listenerName
- TestUtils.createTopic(zkClient, listenerName.value, 2, 2, servers)
-
val trustStoreFile =
if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) Some(this.trustStoreFile)
else None
- val saslProps =
- if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) Some(saslProperties(listenerName))
- else None
-
val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
- producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
- securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+ def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = {
+
+ val topic = s"${listenerName.value}${producers.size}"
+ TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ val clientMetadata = ClientMetadata(listenerName, mechanism, topic)
+
+ producers(clientMetadata) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
+ securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+
+ consumers(clientMetadata) = TestUtils.createNewConsumer(bootstrapServers, groupId = clientMetadata.toString,
+ securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+ }
- consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
- securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+ if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
+ kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { mechanism =>
+ addProducerConsumer(listenerName, mechanism, Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
+ }
+ } else {
+ addProducerConsumer(listenerName, "", saslProps = None)
+ }
}
}
@@ -145,18 +162,34 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
*/
@Test
def testProduceConsume(): Unit = {
- producers.foreach { case (listenerName, producer) =>
- val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
+ producers.foreach { case (clientMetadata, producer) =>
+ val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes,
s"value$i".getBytes))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
- val consumer = consumers(listenerName)
- consumer.subscribe(Collections.singleton(listenerName.value))
+ val consumer = consumers(clientMetadata)
+ consumer.subscribe(Collections.singleton(clientMetadata.topic))
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
TestUtils.waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size == producerRecords.size
- }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records")
+ }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records with mechanism ${clientMetadata.saslMechanism}")
+ }
+ }
+
+ protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = {
+ val listenerName = new ListenerName(listener)
+ val prefix = listenerName.saslMechanismConfigPrefix(mechanism)
+ val jaasConfig = jaasSection.modules.head.toString
+ props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig)
+ }
+
+ case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: String, topic: String) {
+ override def hashCode: Int = Objects.hash(listenerName, saslMechanism)
+ override def equals(obj: Any): Boolean = obj match {
+ case other: ClientMetadata => listenerName == other.listenerName && saslMechanism == other.saslMechanism && topic == other.topic
+ case _ => false
}
+ override def toString: String = s"${listenerName.value}:$saslMechanism:$topic"
}
}
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index c934c4a..18be167 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -87,13 +87,13 @@ class KafkaTest {
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
"--override", "ssl.key.password=key_password",
"--override", "ssl.truststore.password=truststore_password")))
- assertEquals(Password.HIDDEN, config.sslKeyPassword.toString)
- assertEquals(Password.HIDDEN, config.sslKeystorePassword.toString)
- assertEquals(Password.HIDDEN, config.sslTruststorePassword.toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
- assertEquals("key_password", config.sslKeyPassword.value)
- assertEquals("keystore_password", config.sslKeystorePassword.value)
- assertEquals("truststore_password", config.sslTruststorePassword.value)
+ assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
+ assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
+ assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
def prepareDefaultConfig(): String = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 6e78423..b3c46fa 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -27,10 +27,10 @@ import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.{KafkaFuture, Node}
+import org.apache.kafka.common.Node
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
-import org.easymock.{EasyMock, IAnswer}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 13299c7..724e05b 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
import org.apache.log4j.Level
import org.junit.Assert._
@@ -61,7 +62,7 @@ class SocketServerTest extends JUnitSuite {
props.put("connections.max.idle.ms", "60000")
val config = KafkaConfig.fromProps(props)
val metrics = new Metrics
- val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null)
+ val credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, null)
val localAddress = InetAddress.getLoopbackAddress
// Clean-up any metrics left around by previous tests
@@ -406,7 +407,7 @@ class SocketServerTest extends JUnitSuite {
// the following sleep is necessary to reliably detect the connection close when we send data below
Thread.sleep(200L)
// make sure the sockets are open
- server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
+ server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
// then shutdown the server
shutdownServerAndMetrics(server)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 6dedbe0..2e3e274 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -17,13 +17,18 @@
package kafka.server
+import java.util
import java.util.Properties
import kafka.utils.TestUtils
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.junit.Assert._
import org.junit.Test
+import scala.collection.JavaConverters._
+
class DynamicBrokerConfigTest {
@Test
@@ -34,7 +39,7 @@ class DynamicBrokerConfigTest {
val config = KafkaConfig(props)
val dynamicConfig = config.dynamicConfig
assertSame(config, dynamicConfig.currentKafkaConfig)
- assertEquals(oldKeystore, config.sslKeystoreLocation)
+ assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore,
config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -55,7 +60,7 @@ class DynamicBrokerConfigTest {
assertEquals(newKeystore,
config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
- assertEquals(oldKeystore, config.sslKeystoreLocation)
+ assertEquals(oldKeystore, config.getString(KafkaConfig.SslKeystoreLocationProp))
assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -75,38 +80,48 @@ class DynamicBrokerConfigTest {
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
val config = KafkaConfig(origProps)
- def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = {
- val props = new Properties
- validProps.foreach { case (k, v) => props.put(k, v) }
- invalidProps.foreach { case (k, v) => props.put(k, v) }
-
- // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
- // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
- try {
- config.dynamicConfig.validate(props, perBrokerConfig = true)
- fail("Invalid config did not fail validation")
- } catch {
- case e: ConfigException => // expected exception
- }
-
- // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
- // startup and when configs are updated in ZK. Update should apply valid configs and ignore
- // invalid ones.
- config.dynamicConfig.updateBrokerConfig(0, props)
- validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
- invalidProps.keySet.foreach { name =>
- assertEquals(origProps.get(name), config.originals.get(name))
- }
- }
+ val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
- val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" ->"ks.p12")
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
- verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
+ verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
- verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+ verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
+ // Test update of configs with invalid type
val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
- verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
+ verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
+ }
+
+ @Test
+ def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
+ val config = KafkaConfig(origProps)
+ val validProps = Map.empty[String, String]
+ val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
+
+ def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
+ val cleanerThreads = configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
+ if (cleanerThreads <=0 || cleanerThreads >= 5)
+ throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
+ }
+ val reconfigurable = new Reconfigurable {
+ override def configure(configs: util.Map[String, _]): Unit = {}
+ override def reconfigurableConfigs(): util.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp).asJava
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = validateLogCleanerConfig(configs)
+ override def reconfigure(configs: util.Map[String, _]): Unit = {}
+ }
+ config.dynamicConfig.addReconfigurable(reconfigurable)
+ verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
+ config.dynamicConfig.removeReconfigurable(reconfigurable)
+
+ val brokerReconfigurable = new BrokerReconfigurable {
+ override def reconfigurableConfigs: collection.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp)
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = validateLogCleanerConfig(newConfig.originals)
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
+ }
+ config.dynamicConfig.addBrokerReconfigurable(brokerReconfigurable)
+ verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
}
@Test
@@ -151,4 +166,86 @@ class DynamicBrokerConfigTest {
assertEquals(oldValue, config.originals.get(name))
}
}
+
+ private def verifyConfigUpdateWithInvalidConfig(config: KafkaConfig,
+ origProps: Properties,
+ validProps: Map[String, String],
+ invalidProps: Map[String, String]): Unit = {
+ val props = new Properties
+ validProps.foreach { case (k, v) => props.put(k, v) }
+ invalidProps.foreach { case (k, v) => props.put(k, v) }
+
+ // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
+ // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
+ try {
+ config.dynamicConfig.validate(props, perBrokerConfig = true)
+ fail("Invalid config did not fail validation")
+ } catch {
+ case e: ConfigException => // expected exception
+ }
+
+ // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
+ // startup and when configs are updated in ZK. Update should apply valid configs and ignore
+ // invalid ones.
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
+ invalidProps.keySet.foreach { name =>
+ assertEquals(origProps.get(name), config.originals.get(name))
+ }
+ }
+
+ @Test
+ def testPasswordConfigEncryption(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ val configWithoutSecret = KafkaConfig(props)
+ props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
+ val configWithSecret = KafkaConfig(props)
+ val dynamicProps = new Properties
+ dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "myLoginModule required;")
+
+ try {
+ configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+ } catch {
+ case e: ConfigException => // expected exception
+ }
+ val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+ assertFalse("Password not encoded",
+ persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("myLoginModule"))
+ val decodedProps = configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, perBrokerConfig = true)
+ assertEquals("myLoginModule required;", decodedProps.getProperty(KafkaConfig.SaslJaasConfigProp))
+ }
+
+ @Test
+ def testPasswordConfigEncoderSecretChange(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;")
+ props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
+ val config = KafkaConfig(props)
+ val dynamicProps = new Properties
+ dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;")
+
+ val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+ assertFalse("Password not encoded",
+ persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("LoginModule"))
+ config.dynamicConfig.updateBrokerConfig(0, persistedProps)
+ assertEquals("dynamicLoginModule required;", config.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+ // New config with same secret should use the dynamic password config
+ val newConfigWithSameSecret = KafkaConfig(props)
+ newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+ assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+ // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
+ props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret")
+ props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret")
+ val newConfigWithNewAndOldSecret = KafkaConfig(props)
+ newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+ assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+ // New config with new secret alone should revert to static password config since dynamic config cannot be decoded
+ props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret")
+ val newConfigWithNewSecret = KafkaConfig(props)
+ newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+ assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 748efec..6b26334 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -674,12 +674,22 @@ class KafkaConfigTest {
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
+ case KafkaConfig.SaslJaasConfigProp =>
+
+ // Password encoder configs
+ case KafkaConfig.PasswordEncoderSecretProp =>
+ case KafkaConfig.PasswordEncoderOldSecretProp =>
+ case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp =>
+ case KafkaConfig.PasswordEncoderCipherAlgorithmProp =>
+ case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+ case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
//delegation token configs
case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
new file mode 100755
index 0000000..11a2a7a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import javax.crypto.SecretKeyFactory
+
+import kafka.server.Defaults
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.utils.Java
+import org.junit.Assert._
+import org.junit.Test
+
+class PasswordEncoderTest {
+
+ @Test
+ def testEncodeDecode(): Unit = {
+ val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ None,
+ Defaults.PasswordEncoderCipherAlgorithm,
+ Defaults.PasswordEncoderKeyLength,
+ Defaults.PasswordEncoderIterations)
+ val password = "test-password"
+ val encoded = encoder.encode(new Password(password))
+ val encodedMap = CoreUtils.parseCsvMap(encoded)
+ assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp))
+ assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp))
+ val defaultKeyFactoryAlgorithm = try {
+ SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
+ "PBKDF2WithHmacSHA512"
+ } catch {
+ case _: Exception => "PBKDF2WithHmacSHA1"
+ }
+ assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
+ assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
+
+ verifyEncodedPassword(encoder, password, encoded)
+ }
+
+ @Test
+ def testEncoderConfigChange(): Unit = {
+ val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ Some("PBKDF2WithHmacSHA1"),
+ "DES/CBC/PKCS5Padding",
+ 64,
+ 1024)
+ val password = "test-password"
+ val encoded = encoder.encode(new Password(password))
+ val encodedMap = CoreUtils.parseCsvMap(encoded)
+ assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp))
+ assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp))
+ assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
+ assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
+
+ // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
+ val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ Some("PBKDF2WithHmacSHA1"),
+ "AES/CBC/PKCS5Padding",
+ 128,
+ 2048)
+ assertEquals(password, decoder.decode(encoded).value)
+
+ // Test that decoding fails if secret is altered
+ val decoder2 = new PasswordEncoder(new Password("secret-2"),
+ Some("PBKDF2WithHmacSHA1"),
+ "AES/CBC/PKCS5Padding",
+ 128,
+ 1024)
+ try {
+ decoder2.decode(encoded)
+ } catch {
+ case e: ConfigException => // expected exception
+ }
+ }
+
+ @Test
+ def testEncodeDecodeAlgorithms(): Unit = {
+
+ def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
+ val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ keyFactoryAlg,
+ cipherAlg,
+ keyLength,
+ Defaults.PasswordEncoderIterations)
+ val password = "test-password"
+ val encoded = encoder.encode(new Password(password))
+ verifyEncodedPassword(encoder, password, encoded)
+ }
+
+ verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64)
+ verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192)
+ verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128)
+ verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
+ verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
+ verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+ if (Java.IS_JAVA8_COMPATIBLE) {
+ verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128)
+ verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+ verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+ }
+ }
+
+ private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
+ val encodedMap = CoreUtils.parseCsvMap(encoded)
+ assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp))
+ assertNotNull("Invalid salt", encoder.base64Decode(encodedMap("salt")))
+ assertNotNull("Invalid encoding parameters", encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp)))
+ assertNotNull("Invalid encoded password", encoder.base64Decode(encodedMap(PasswordEncoder.EncyrptedPasswordProp)))
+ assertEquals(password, encoder.decode(encoded).value)
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 407bdb5..303afa7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -718,7 +718,7 @@ object TestUtils extends Logging {
def deleteBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] = {
val brokers = ids.map(createBroker(_, "localhost", 6667, SecurityProtocol.PLAINTEXT))
- brokers.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
+ ids.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
brokers
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.