From commits-return-8854-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Sun Feb 4 18:19:27 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id B40A718064A for ; Sun, 4 Feb 2018 18:19:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A39B7160C40; Sun, 4 Feb 2018 17:19:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 31135160C37 for ; Sun, 4 Feb 2018 18:19:24 +0100 (CET) Received: (qmail 10433 invoked by uid 500); 4 Feb 2018 17:19:23 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 10424 invoked by uid 99); 4 Feb 2018 17:19:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Feb 2018 17:19:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id EF64381FCC; Sun, 4 Feb 2018 17:19:21 +0000 (UTC) Date: Sun, 04 Feb 2018 17:19:21 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6246; Dynamic update of listeners and security configs (#4488) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151776476149.26683.15052716633963097665@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: f9b56d680bab86fad276d01c8421d3679a788f2b X-Git-Newrev: 4019b21d602c0912d9dc0286f10701e7e724f750 X-Git-Rev: 4019b21d602c0912d9dc0286f10701e7e724f750 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4019b21 KAFKA-6246; Dynamic update of listeners and security configs (#4488) 4019b21 is described below commit 4019b21d602c0912d9dc0286f10701e7e724f750 Author: Rajini Sivaram AuthorDate: Sun Feb 4 09:19:17 2018 -0800 KAFKA-6246; Dynamic update of listeners and security configs (#4488) Dynamic update of listeners as described in KIP-226. This includes: - Addition of new listeners with listener-prefixed security configs - Removal of existing listeners - Password encryption - sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/common/Reconfigurable.java | 9 +- .../apache/kafka/common/config/AbstractConfig.java | 16 + .../kafka/common/network/ChannelBuilders.java | 18 +- .../apache/kafka/common/network/ListenerName.java | 4 + .../kafka/common/network/SaslChannelBuilder.java | 58 +-- .../kafka/common/network/SslChannelBuilder.java | 4 +- .../apache/kafka/common/security/JaasContext.java | 84 ++--- .../security/authenticator/CredentialCache.java | 8 +- .../security/authenticator/LoginManager.java | 4 +- .../authenticator/SaslServerAuthenticator.java | 23 +- .../security/scram/ScramCredentialUtils.java | 4 +- .../kafka/common/security/ssl/SslFactory.java | 7 +- .../kafka/common/config/AbstractConfigTest.java | 49 +++ .../common/network/SaslChannelBuilderTest.java | 20 +- .../kafka/common/security/JaasContextTest.java | 25 +- .../authenticator/SaslAuthenticatorTest.java | 48 ++- .../authenticator/SaslServerAuthenticatorTest.java | 13 +- .../security/authenticator/TestJaasConfig.java | 14 + .../scala/kafka/controller/KafkaController.scala | 67 +++- .../main/scala/kafka/network/SocketServer.scala | 56 ++- .../scala/kafka/security/CredentialProvider.scala | 6 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 254 ++++++++++--- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 99 +++-- core/src/main/scala/kafka/server/KafkaServer.scala | 9 +- .../main/scala/kafka/utils/PasswordEncoder.scala | 175 +++++++++ core/src/main/scala/kafka/zk/KafkaZkClient.scala | 7 + .../kafka/api/AdminClientIntegrationTest.scala | 4 +- .../scala/integration/kafka/api/SaslSetup.scala | 16 +- .../server/DynamicBrokerReconfigurationTest.scala | 404 +++++++++++++++++++-- ...pleListenersWithAdditionalJaasContextTest.scala | 24 +- ...ltipleListenersWithDefaultJaasContextTest.scala | 9 +- ...ListenersWithSameSecurityProtocolBaseTest.scala | 81 +++-- .../test/scala/unit/kafka/KafkaConfigTest.scala | 12 +- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 5 +- .../kafka/server/DynamicBrokerConfigTest.scala | 155 ++++++-- .../scala/unit/kafka/server/KafkaConfigTest.scala | 10 + .../unit/kafka/utils/PasswordEncoderTest.scala | 127 +++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- 41 files changed, 1574 insertions(+), 363 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 5662552..000acc3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -50,6 +50,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java index 3339dce..8db9dc2 100644 --- a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java +++ b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common; +import org.apache.kafka.common.config.ConfigException; + import java.util.Map; import java.util.Set; @@ -33,9 +35,12 @@ public interface Reconfigurable extends Configurable { * Validates the provided configuration. The provided map contains * all configs including any reconfigurable configs that may be different * from the initial configuration. Reconfiguration will be not performed - * if this method returns false or throws any exception. + * if this method throws any exception. + * @throws ConfigException if the provided configs are not valid. The exception + * message from ConfigException will be returned to the client in + * the AlterConfigs response. */ - boolean validateReconfiguration(Map configs); + void validateReconfiguration(Map configs) throws ConfigException; /** * Reconfigures this instance with the given key-value pairs. The provided diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index ce3ae43..1713d78 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -204,6 +204,16 @@ public class AbstractConfig { * put all the remaining keys with the prefix stripped and their parsed values in the result map. * * This is useful if one wants to allow prefixed configs to override default ones. + *

+ * Two forms of prefixes are supported: + *

    + *
  • listener.name.{listenerName}.some.prop: If the provided prefix is `listener.name.{listenerName}.`, + * the key `some.prop` with the value parsed using the definition of `some.prop` is returned.
  • + *
  • listener.name.{listenerName}.{mechanism}.some.prop: If the provided prefix is `listener.name.{listenerName}.`, + * the key `{mechanism}.some.prop` with the value parsed using the definition of `some.prop` is returned. + * This is used to provide per-mechanism configs for a broker listener (e.g sasl.jaas.config)
  • + *
+ *

*/ public Map valuesWithPrefixOverride(String prefix) { Map result = new RecordingMap<>(values(), prefix, true); @@ -213,6 +223,12 @@ public class AbstractConfig { ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix); if (configKey != null) result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true)); + else { + String keyWithNoSecondaryPrefix = keyWithNoPrefix.substring(keyWithNoPrefix.indexOf('.') + 1); + configKey = definition.configKeys().get(keyWithNoSecondaryPrefix); + if (configKey != null) + result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true)); + } } } return result; diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index e6df78e..80ccb7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -29,6 +29,9 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; import org.apache.kafka.common.utils.Utils; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; public class ChannelBuilders { @@ -105,9 +108,20 @@ public class ChannelBuilders { case SASL_SSL: case SASL_PLAINTEXT: requireNonNullMode(mode, securityProtocol); - JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs); + Map jaasContexts; + if (mode == Mode.SERVER) { + List enabledMechanisms = (List) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); + jaasContexts = new HashMap<>(enabledMechanisms.size()); + for (String mechanism : enabledMechanisms) + jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs)); + } else { + // Use server context for inter-broker client connections and client context for other clients + JaasContext jaasContext = contextType == JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) : + JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs); + jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext); + } channelBuilder = new SaslChannelBuilder(mode, - jaasContext, + jaasContexts, securityProtocol, listenerName, isInterBrokerListener, diff --git a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java index 9da4cca..fc0cb14 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java @@ -71,4 +71,8 @@ public final class ListenerName { public String configPrefix() { return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + "."; } + + public String saslMechanismConfigPrefix(String saslMechanism) { + return configPrefix() + saslMechanism.toLowerCase(Locale.ROOT) + "."; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 1716e0e..095f826 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -41,6 +41,7 @@ import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,18 +56,19 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl private final boolean isInterBrokerListener; private final String clientSaslMechanism; private final Mode mode; - private final JaasContext jaasContext; + private final Map jaasContexts; private final boolean handshakeRequestEnable; private final CredentialCache credentialCache; private final DelegationTokenCache tokenCache; + private final Map loginManagers; + private final Map subjects; - private LoginManager loginManager; private SslFactory sslFactory; private Map configs; private KerberosShortNamer kerberosShortNamer; public SaslChannelBuilder(Mode mode, - JaasContext jaasContext, + Map jaasContexts, SecurityProtocol securityProtocol, ListenerName listenerName, boolean isInterBrokerListener, @@ -75,7 +77,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl CredentialCache credentialCache, DelegationTokenCache tokenCache) { this.mode = mode; - this.jaasContext = jaasContext; + this.jaasContexts = jaasContexts; + this.loginManagers = new HashMap<>(jaasContexts.size()); + this.subjects = new HashMap<>(jaasContexts.size()); this.securityProtocol = securityProtocol; this.listenerName = listenerName; this.isInterBrokerListener = isInterBrokerListener; @@ -89,14 +93,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl public void configure(Map configs) throws KafkaException { try { this.configs = configs; - boolean hasKerberos; - if (mode == Mode.SERVER) { - @SuppressWarnings("unchecked") - List enabledMechanisms = (List) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); - hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM); - } else { - hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM); - } + boolean hasKerberos = jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM); if (hasKerberos) { String defaultRealm; @@ -110,8 +107,14 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl if (principalToLocalRules != null) kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); } - this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs); - + for (Map.Entry entry : jaasContexts.entrySet()) { + String mechanism = entry.getKey(); + // With static JAAS configuration, use KerberosLogin if Kerberos is enabled. With dynamic JAAS configuration, + // use KerberosLogin only for the LoginContext corresponding to GSSAPI + LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, hasKerberos, configs); + loginManagers.put(mechanism, loginManager); + subjects.put(mechanism, loginManager.subject()); + } if (this.securityProtocol == SecurityProtocol.SASL_SSL) { // Disable SSL client authentication as we are using SASL authentication this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener); @@ -129,11 +132,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl } @Override - public boolean validateReconfiguration(Map configs) { + public void validateReconfiguration(Map configs) { if (this.securityProtocol == SecurityProtocol.SASL_SSL) - return sslFactory.validateReconfiguration(configs); - else - return true; + sslFactory.validateReconfiguration(configs); } @Override @@ -154,11 +155,13 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl Socket socket = socketChannel.socket(); TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); Authenticator authenticator; - if (mode == Mode.SERVER) - authenticator = buildServerAuthenticator(configs, id, transportLayer, loginManager.subject()); - else + if (mode == Mode.SERVER) { + authenticator = buildServerAuthenticator(configs, id, transportLayer, subjects); + } else { + LoginManager loginManager = loginManagers.get(clientSaslMechanism); authenticator = buildClientAuthenticator(configs, id, socket.getInetAddress().getHostName(), loginManager.serviceName(), transportLayer, loginManager.subject()); + } return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { log.info("Failed to create channel due to ", e); @@ -168,10 +171,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl @Override public void close() { - if (loginManager != null) { + for (LoginManager loginManager : loginManagers.values()) loginManager.release(); - loginManager = null; - } + loginManagers.clear(); } private TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { @@ -185,8 +187,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl // Visible to override for testing protected SaslServerAuthenticator buildServerAuthenticator(Map configs, String id, - TransportLayer transportLayer, Subject subject) throws IOException { - return new SaslServerAuthenticator(configs, id, jaasContext, subject, + TransportLayer transportLayer, Map subjects) throws IOException { + return new SaslServerAuthenticator(configs, id, jaasContexts, subjects, kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache); } @@ -198,8 +200,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl } // Package private for testing - LoginManager loginManager() { - return loginManager; + Map loginManagers() { + return loginManagers; } private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException, diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index e024d32..941c455 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -71,8 +71,8 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable } @Override - public boolean validateReconfiguration(Map configs) { - return sslFactory.validateReconfiguration(configs); + public void validateReconfiguration(Map configs) { + sslFactory.validateReconfiguration(configs); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java index a13acd2..46db345 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java @@ -41,57 +41,59 @@ public class JaasContext { /** * Returns an instance of this class. * - * For contextType SERVER, the context will contain the default Configuration and the context name will be one of: + * The context will contain the configuration specified by the JAAS configuration property + * {@link SaslConfigs#SASL_JAAS_CONFIG} with prefix `listener.name.{listenerName}.{mechanism}.` + * with listenerName and mechanism in lower case. The context `KafkaServer` will be returned + * with a single login context entry loaded from the property. + *

+ * If the property is not defined, the context will contain the default Configuration and + * the context name will be one of: + *

    + *
  1. Lowercased listener name followed by a period and the string `KafkaServer`
  2. + *
  3. The string `KafkaServer`
  4. + *
+ * If both are valid entries in the default JAAS configuration, the first option is chosen. + *

* - * 1. Lowercased listener name followed by a period and the string `KafkaServer` - * 2. The string `KafkaServer` - * - * If both are valid entries in the JAAS configuration, the first option is chosen. + * @throws IllegalArgumentException if listenerName or mechanism is not defined. + */ + public static JaasContext loadServerContext(ListenerName listenerName, String mechanism, Map configs) { + if (listenerName == null) + throw new IllegalArgumentException("listenerName should not be null for SERVER"); + if (mechanism == null) + throw new IllegalArgumentException("mechanism should not be null for SERVER"); + String globalContextName = GLOBAL_CONTEXT_NAME_SERVER; + String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER; + Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG); + if (jaasConfigArgs == null && configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) + LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG); + return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs); + } + + /** + * Returns an instance of this class. * - * For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified, + * If JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified, * the configuration object is created by parsing the property value. Otherwise, the default Configuration * is returned. The context name is always `KafkaClient`. * - * @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if - * listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT. */ - public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName, - Map configs) { - String listenerContextName; - String globalContextName; - switch (contextType) { - case CLIENT: - if (listenerName != null) - throw new IllegalArgumentException("listenerName should be null for CLIENT"); - globalContextName = GLOBAL_CONTEXT_NAME_CLIENT; - listenerContextName = null; - break; - case SERVER: - if (listenerName == null) - throw new IllegalArgumentException("listenerName should not be null for SERVER"); - globalContextName = GLOBAL_CONTEXT_NAME_SERVER; - listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER; - break; - default: - throw new IllegalArgumentException("Unexpected context type " + contextType); - } - return load(contextType, listenerContextName, globalContextName, configs); + public static JaasContext loadClientContext(Map configs) { + String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT; + Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); + return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs); } static JaasContext load(JaasContext.Type contextType, String listenerContextName, - String globalContextName, Map configs) { - Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); + String globalContextName, Password jaasConfigArgs) { if (jaasConfigArgs != null) { - if (contextType == JaasContext.Type.SERVER) - throw new IllegalArgumentException("JAAS config property not supported for server"); - else { - JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value()); - AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName); - int numModules = clientModules == null ? 0 : clientModules.length; - if (numModules != 1) - throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module"); - return new JaasContext(globalContextName, contextType, jaasConfig); - } + JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value()); + AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName); + if (contextModules == null || contextModules.length == 0) + throw new IllegalArgumentException("JAAS config property does not contain any login modules"); + else if (contextModules.length != 1) + throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module"); + return new JaasContext(globalContextName, contextType, jaasConfig); } else return defaultContext(contextType, listenerContextName, globalContextName); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java index aa39101..96e7426 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java @@ -16,18 +16,16 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class CredentialCache { - private final Map> cacheMap = new HashMap<>(); + private final ConcurrentHashMap> cacheMap = new ConcurrentHashMap<>(); public Cache createCache(String mechanism, Class credentialClass) { Cache cache = new Cache(credentialClass); - cacheMap.put(mechanism, cache); - return cache; + Cache oldCache = (Cache) cacheMap.putIfAbsent(mechanism, cache); + return oldCache == null ? cache : oldCache; } @SuppressWarnings("unchecked") diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index a576e37..dc264c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -64,7 +64,7 @@ public class LoginManager { * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more * complicated to do the latter without making the consumer API more complex. */ - public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos, + public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos, Map configs) throws IOException, LoginException { synchronized (LoginManager.class) { // SASL_JAAS_CONFIG is only supported by clients @@ -73,7 +73,7 @@ public class LoginManager { if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) { loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue); if (loginManager == null) { - loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue); + loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue); DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager); } } else { diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 6fcca57..ca6e9d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -101,8 +101,8 @@ public class SaslServerAuthenticator implements Authenticator { private final SecurityProtocol securityProtocol; private final ListenerName listenerName; private final String connectionId; - private final JaasContext jaasContext; - private final Subject subject; + private final Map jaasContexts; + private final Map subjects; private final CredentialCache credentialCache; private final TransportLayer transportLayer; private final Set enabledMechanisms; @@ -128,19 +128,17 @@ public class SaslServerAuthenticator implements Authenticator { public SaslServerAuthenticator(Map configs, String connectionId, - JaasContext jaasContext, - Subject subject, + Map jaasContexts, + Map subjects, KerberosShortNamer kerberosNameParser, CredentialCache credentialCache, ListenerName listenerName, SecurityProtocol securityProtocol, TransportLayer transportLayer, DelegationTokenCache tokenCache) throws IOException { - if (subject == null) - throw new IllegalArgumentException("subject cannot be null"); this.connectionId = connectionId; - this.jaasContext = jaasContext; - this.subject = subject; + this.jaasContexts = jaasContexts; + this.subjects = subjects; this.credentialCache = credentialCache; this.listenerName = listenerName; this.securityProtocol = securityProtocol; @@ -154,6 +152,12 @@ public class SaslServerAuthenticator implements Authenticator { if (enabledMechanisms == null || enabledMechanisms.isEmpty()) throw new IllegalArgumentException("No SASL mechanisms are enabled"); this.enabledMechanisms = new HashSet<>(enabledMechanisms); + for (String mechanism : enabledMechanisms) { + if (!jaasContexts.containsKey(mechanism)) + throw new IllegalArgumentException("Jaas context not specified for SASL mechanism " + mechanism); + if (!subjects.containsKey(mechanism)) + throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism); + } // Note that the old principal builder does not support SASL, so we do not need to pass the // authenticator or the transport layer @@ -162,8 +166,9 @@ public class SaslServerAuthenticator implements Authenticator { private void createSaslServer(String mechanism) throws IOException { this.saslMechanism = mechanism; + Subject subject = subjects.get(mechanism); if (!ScramMechanism.isScram(mechanism)) - callbackHandler = new SaslServerCallbackHandler(jaasContext); + callbackHandler = new SaslServerCallbackHandler(jaasContexts.get(mechanism)); else callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache); callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java index 8d80542..b4875d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java @@ -76,9 +76,9 @@ public final class ScramCredentialUtils { return props; } - public static void createCache(CredentialCache cache, Collection enabledMechanisms) { + public static void createCache(CredentialCache cache, Collection mechanisms) { for (String mechanism : ScramMechanism.mechanismNames()) { - if (enabledMechanisms.contains(mechanism)) + if (mechanisms.contains(mechanism)) cache.createCache(mechanism, ScramCredential.class); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 0d1fbf9..c54260d 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -144,14 +144,13 @@ public class SslFactory implements Reconfigurable { } @Override - public boolean validateReconfiguration(Map configs) { + public void validateReconfiguration(Map configs) { try { SecurityStore newKeystore = maybeCreateNewKeystore(configs); if (newKeystore != null) createSSLContext(newKeystore); - return true; } catch (Exception e) { - throw new KafkaException("Validation of dynamic config update failed", e); + throw new ConfigException("Validation of dynamic config update failed", e); } } @@ -163,7 +162,7 @@ public class SslFactory implements Reconfigurable { this.sslContext = createSSLContext(newKeystore); this.keystore = newKeystore; } catch (Exception e) { - throw new KafkaException("Reconfiguration of SSL keystore failed", e); + throw new ConfigException("Reconfiguration of SSL keystore failed", e); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 074df10..071deed 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metrics.FakeMetricsReporter; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricsReporter; @@ -140,6 +141,54 @@ public class AbstractConfigTest { } @Test + public void testValuesWithSecondaryPrefix() { + String prefix = "listener.name.listener1."; + Password saslJaasConfig1 = new Password("test.myLoginModule1 required;"); + Password saslJaasConfig2 = new Password("test.myLoginModule2 required;"); + Password saslJaasConfig3 = new Password("test.myLoginModule3 required;"); + Properties props = new Properties(); + props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value()); + props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value()); + props.put("sasl.jaas.config", saslJaasConfig3.value()); + props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2"); + props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka"); + props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000"); + props.put("ssl.provider", "TEST"); + TestSecurityConfig config = new TestSecurityConfig(props); + Map valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix); + + // prefix with mechanism overrides global + assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config")); + assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config")); + assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config")); + assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config")); + assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config")); + assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config")); + assertFalse(config.unused().contains("sasl.jaas.config")); + + // prefix with mechanism overrides default + assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); + assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); + assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd")); + + // prefix override for mechanism with no default + assertFalse(config.unused().contains("sasl.kerberos.service.name")); + assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name")); + assertFalse(config.unused().contains("sasl.kerberos.service.name")); + assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name")); + assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + + // unset with no default + assertTrue(config.unused().contains("ssl.provider")); + assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider")); + assertTrue(config.unused().contains("ssl.provider")); + } + + @Test public void testValuesWithPrefixAllOrNothing() { String prefix1 = "prefix1."; String prefix2 = "prefix2."; diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java index 86d26d4..6072bf5 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java @@ -26,9 +26,10 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class SaslChannelBuilderTest { @@ -37,20 +38,20 @@ public class SaslChannelBuilderTest { public void testCloseBeforeConfigureIsIdempotent() { SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT); builder.close(); - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); builder.close(); - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); } @Test public void testCloseAfterConfigIsIdempotent() { SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT); builder.configure(new HashMap()); - assertNotNull(builder.loginManager()); + assertNotNull(builder.loginManagers().get("PLAIN")); builder.close(); - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); builder.close(); - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); } @Test @@ -61,17 +62,18 @@ public class SaslChannelBuilderTest { builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1")); fail("Exception should have been thrown"); } catch (KafkaException e) { - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); } builder.close(); - assertNull(builder.loginManager()); + assertTrue(builder.loginManagers().isEmpty()); } private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); - return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"), + Map jaasContexts = Collections.singletonMap("PLAIN", jaasContext); + return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"), false, "PLAIN", true, null, null); } diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java index 49d5a86..e8535d2 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java @@ -188,8 +188,8 @@ public class JaasContextTest { "KafkaServer { test.LoginModuleDefault required; };", "plaintext.KafkaServer { test.LoginModuleOverride requisite; };" )); - JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), - Collections.emptyMap()); + JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"), + "SOME-MECHANISM", Collections.emptyMap()); assertEquals("plaintext.KafkaServer", context.name()); assertEquals(JaasContext.Type.SERVER, context.type()); assertEquals(1, context.configurationEntries().size()); @@ -203,8 +203,8 @@ public class JaasContextTest { "KafkaServer { test.LoginModule required; };", "other.KafkaServer { test.LoginModuleOther requisite; };" )); - JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), - Collections.emptyMap()); + JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"), + "SOME-MECHANISM", Collections.emptyMap()); assertEquals("KafkaServer", context.name()); assertEquals(JaasContext.Type.SERVER, context.type()); assertEquals(1, context.configurationEntries().size()); @@ -215,24 +215,13 @@ public class JaasContextTest { @Test(expected = IllegalArgumentException.class) public void testLoadForServerWithWrongListenerName() throws IOException { writeConfiguration("Server", "test.LoginModule required;"); - JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), - Collections.emptyMap()); - } - - /** - * ListenerName can only be used with Type.SERVER. - */ - @Test(expected = IllegalArgumentException.class) - public void testLoadForClientWithListenerName() { - JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"), + JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM", Collections.emptyMap()); } private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) { - Map configs = new HashMap<>(); - if (jaasConfigProp != null) - configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp)); - JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs); + Password saslJaasConfig = jaasConfigProp == null ? null : new Password(jaasConfigProp); + JaasContext context = JaasContext.load(contextType, null, contextType.name(), saslJaasConfig); List entries = context.configurationEntries(); assertEquals(1, entries.size()); return entries.get(0); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index e3d6b7a..ef2a075 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -715,7 +715,7 @@ public class SaslAuthenticatorTest { * property override is used during authentication. */ @Test - public void testDynamicJaasConfiguration() throws Exception { + public void testClientDynamicJaasConfiguration() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN")); @@ -756,6 +756,37 @@ public class SaslAuthenticatorTest { } } + /** + * Tests dynamic JAAS configuration property for SASL server. Invalid server credentials + * are set in the static JVM-wide configuration instance to ensure that the dynamic + * property override is used during authentication. + */ + @Test + public void testServerDynamicJaasConfiguration() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN")); + Map serverOptions = new HashMap<>(); + serverOptions.put("user_user1", "user1-secret"); + serverOptions.put("user_user2", "user2-secret"); + saslServerConfigs.put("listener.name.sasl_ssl.plain." + SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions)); + TestJaasConfig staticJaasConfig = new TestJaasConfig(); + staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), + Collections.emptyMap()); + staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret"); + Configuration.setConfiguration(staticJaasConfig); + server = createEchoServer(securityProtocol); + + // Check that 'user1' can connect with static Jaas config + createAndCheckClientConnection(securityProtocol, "1"); + + // Check that user 'user2' can also connect with a Jaas config override + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret")); + createAndCheckClientConnection(securityProtocol, "2"); + } + @Test public void testJaasConfigurationForListener() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; @@ -986,18 +1017,19 @@ public class SaslAuthenticatorTest { throws Exception { final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); final Map configs = Collections.emptyMap(); - final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName, configs); + final JaasContext jaasContext = JaasContext.loadServerContext(listenerName, saslMechanism, configs); + final Map jaasContexts = Collections.singletonMap(saslMechanism, jaasContext); boolean isScram = ScramMechanism.isScram(saslMechanism); if (isScram) ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism)); - SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext, + SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts, securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) { @Override protected SaslServerAuthenticator buildServerAuthenticator(Map configs, String id, - TransportLayer transportLayer, Subject subject) throws IOException { - return new SaslServerAuthenticator(configs, id, jaasContext, subject, null, + TransportLayer transportLayer, Map subjects) throws IOException { + return new SaslServerAuthenticator(configs, id, jaasContexts, subjects, null, credentialCache, listenerName, securityProtocol, transportLayer, null) { @Override @@ -1032,8 +1064,10 @@ public class SaslAuthenticatorTest { final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); final Map configs = Collections.emptyMap(); - final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs); - SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext, + final JaasContext jaasContext = JaasContext.loadClientContext(configs); + final Map jaasContexts = Collections.singletonMap(saslMechanism, jaasContext); + + SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, listenerName, false, saslMechanism, true, null, null) { @Override diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 51ea58e..72c2969 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -51,7 +51,7 @@ public class SaslServerAuthenticatorTest { TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); final Capture size = EasyMock.newCapture(); EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer() { @@ -72,7 +72,7 @@ public class SaslServerAuthenticatorTest { TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); @@ -106,12 +106,13 @@ public class SaslServerAuthenticatorTest { } } - private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer) throws IOException { + private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, String mechanism) throws IOException { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); - JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); - Subject subject = new Subject(); - return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(), + Map jaasContexts = Collections.singletonMap(mechanism, + new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig)); + Map subjects = Collections.singletonMap(mechanism, new Subject()); + return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(), new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames())); } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java index 5336fd7..dafa79d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java @@ -54,6 +54,20 @@ public class TestJaasConfig extends Configuration { return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";"); } + public static Password jaasConfigProperty(String mechanism, Map options) { + StringBuilder builder = new StringBuilder(); + builder.append(loginModule(mechanism)); + builder.append(" required"); + for (Map.Entry option : options.entrySet()) { + builder.append(' '); + builder.append(option.getKey()); + builder.append('='); + builder.append(option.getValue()); + } + builder.append(';'); + return new Password(builder.toString()); + } + public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword) { Map options = new HashMap<>(); if (clientUsername != null) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a8e1249..f36fc79 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -54,11 +54,13 @@ object KafkaController extends Logging { } -class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo, +class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo, tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = s"[Controller id=${config.brokerId}] " + @volatile private var brokerInfo = initialBrokerInfo + private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) val controllerContext = new ControllerContext @@ -77,6 +79,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager) private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager) + private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty private val topicChangeHandler = new TopicChangeHandler(this, eventManager) private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager) private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty @@ -274,6 +277,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path) zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path) zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path) + unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet) // reset topic deletion manager topicDeletionManager.reset() @@ -360,6 +364,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics") topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } + registerBrokerModificationsHandler(newBrokers) + } + + private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = { + debug(s"Register BrokerModifications handler for $brokerIds") + brokerIds.foreach { brokerId => + val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId) + zkClient.registerZNodeChangeHandler(brokerModificationsHandler) + brokerModificationsHandlers.put(brokerId, brokerModificationsHandler) + } + } + + private def unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = { + debug(s"Unregister BrokerModifications handler for $brokerIds") + brokerIds.foreach { brokerId => + brokerModificationsHandlers.remove(brokerId).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path)) + } } /* @@ -374,6 +395,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.") val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet) onReplicasBecomeOffline(allReplicasOnDeadBrokers) + + unregisterBrokerModificationsHandler(deadBrokers) + } + + private def onBrokerUpdate(updatedBrokers: Seq[Int]) { + info(s"Broker info update callback for ${updatedBrokers.mkString(",")}") + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } /** @@ -613,6 +641,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch] controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] + // register broker modifications handlers + registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id)) // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager @@ -1209,6 +1239,29 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } + case object BrokerModifications extends ControllerEvent { + override def state: ControllerState = ControllerState.BrokerChange + + override def process(): Unit = { + if (!isActive) return + val curBrokers = zkClient.getAllBrokersInCluster.toSet + val updatedBrokers = controllerContext.liveBrokers.filter { broker => + val existingBroker = curBrokers.find(_.id == broker.id) + existingBroker match { + case Some(b) => broker.endPoints != b.endPoints + case None => false + } + } + if (updatedBrokers.nonEmpty) { + val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted + info(s"Updated brokers: $updatedBrokers") + + controllerContext.liveBrokers = curBrokers // Update broker metadata + onBrokerUpdate(updatedBrokerIdsSorted) + } + } + } + case object TopicChange extends ControllerEvent { override def state: ControllerState = ControllerState.TopicChange @@ -1458,7 +1511,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler { override val path: String = BrokerIdsZNode.path - override def handleChildChange(): Unit = eventManager.put(controller.BrokerChange) + override def handleChildChange(): Unit = { + eventManager.put(controller.BrokerChange) + } +} + +class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler { + override val path: String = BrokerIdZNode.path(brokerId) + + override def handleDataChange(): Unit = { + eventManager.put(controller.BrokerModifications) + } } class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index f6ec974..c11ebcb 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -54,7 +54,6 @@ import scala.util.control.ControlThrowable */ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { - private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap private val maxQueuedRequests = config.queuedMaxRequests private val maxConnectionsPerIp = config.maxConnectionsPerIp @@ -72,7 +71,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val processors = new ConcurrentHashMap[Int, Processor]() private var nextProcessorId = 0 - private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() + private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _ private var stoppedProcessingRequests = false @@ -82,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) - createProcessors(config.numNetworkThreads) + createProcessors(config.numNetworkThreads, config.listeners) } newGauge("NetworkProcessorAvgIdlePercent", @@ -111,14 +110,17 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Started " + acceptors.size + " acceptor threads") } - private def createProcessors(newProcessorsPerListener: Int): Unit = synchronized { + private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap + + private def createProcessors(newProcessorsPerListener: Int, + endpoints: Seq[EndPoint]): Unit = synchronized { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId val numProcessorThreads = config.numNetworkThreads - config.listeners.foreach { endpoint => + endpoints.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() @@ -130,12 +132,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } listenerProcessors.foreach(p => processors.put(p.id, p)) - val acceptor = acceptors.getOrElseUpdate(endpoint, { - val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas) - KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() - acceptor.awaitStartup() - acceptor - }) + val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas) + KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() + acceptor.awaitStartup() + acceptors.put(endpoint, acceptor) acceptor.addProcessors(listenerProcessors) } } @@ -153,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def stopProcessingRequests() = { info("Stopping socket server request processors") this.synchronized { - acceptors.values.foreach(_.shutdown) + acceptors.asScala.values.foreach(_.shutdown) processors.asScala.values.foreach(_.shutdown) requestChannel.clear() stoppedProcessingRequests = true @@ -161,12 +161,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Stopped socket server request processors") } - def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = { + def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized { info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads") if (newNumNetworkThreads > oldNumNetworkThreads) - createProcessors(newNumNetworkThreads - oldNumNetworkThreads) + createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners) else if (newNumNetworkThreads < oldNumNetworkThreads) - acceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel)) + acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel)) } /** @@ -185,9 +185,22 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def boundPort(listenerName: ListenerName): Int = { try { - acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort + acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort } catch { - case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e) + case e: Exception => + throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e) + } + } + + def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized { + info(s"Adding listeners for endpoints $listenersAdded") + createProcessors(config.numNetworkThreads, listenersAdded) + } + + def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { + info(s"Removing listeners for endpoints $listenersRemoved") + listenersRemoved.foreach { endpoint => + acceptors.asScala.remove(endpoint).foreach(_.shutdown()) } } @@ -239,8 +252,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete */ def shutdown(): Unit = { - alive.set(false) - wakeup() + if (alive.getAndSet(false)) + wakeup() shutdownLatch.await() } @@ -312,6 +325,11 @@ private[kafka] class Acceptor(val endPoint: EndPoint, toRemove.foreach(processor => requestChannel.removeProcessor(processor.id)) } + override def shutdown(): Unit = { + super.shutdown() + processors.foreach(_.shutdown()) + } + /** * Accept loop that checks for new connection attempts */ diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala index 120e8f9..0e7ebb6 100644 --- a/core/src/main/scala/kafka/security/CredentialProvider.scala +++ b/core/src/main/scala/kafka/security/CredentialProvider.scala @@ -17,7 +17,7 @@ package kafka.security -import java.util.{List, Properties} +import java.util.{Collection, Properties} import org.apache.kafka.common.security.authenticator.CredentialCache import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism} @@ -25,10 +25,10 @@ import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.common.config.ConfigDef._ import org.apache.kafka.common.security.token.delegation.DelegationTokenCache -class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) { +class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) { val credentialCache = new CredentialCache - ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms) + ScramCredentialUtils.createCache(credentialCache, scramMechanisms) def updateCredentials(username: String, config: Properties) { for (mechanism <- ScramMechanism.values()) { diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 9c85f9b..a95de0a 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -17,20 +17,22 @@ package kafka.server -import java.nio.charset.StandardCharsets import java.util import java.util.{Collections, Properties} import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.cluster.EndPoint import kafka.log.{LogCleaner, LogConfig, LogManager} import kafka.server.DynamicBrokerConfig._ -import kafka.utils.{CoreUtils, Logging} +import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs} -import org.apache.kafka.common.network.ListenerReconfigurable import org.apache.kafka.common.metrics.MetricsReporter -import org.apache.kafka.common.utils.{Base64, Utils} +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} +import org.apache.kafka.common.security.authenticator.LoginManager +import org.apache.kafka.common.utils.Utils import scala.collection._ import scala.collection.JavaConverters._ @@ -71,11 +73,7 @@ import scala.collection.JavaConverters._ */ object DynamicBrokerConfig { - private val DynamicPasswordConfigs = Set( - SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - SslConfigs.SSL_KEY_PASSWORD_CONFIG - ) - private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala + private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala val AllDynamicConfigs = mutable.Set[String]() AllDynamicConfigs ++= DynamicSecurityConfigs @@ -83,11 +81,17 @@ object DynamicBrokerConfig { AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp) + AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs - private val PerBrokerConfigs = DynamicSecurityConfigs + private val PerBrokerConfigs = DynamicSecurityConfigs ++ + DynamicListenerConfig.ReconfigurableConfigs val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r + private[server] val DynamicPasswordConfigs = { + val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet + AllDynamicConfigs.intersect(passwordConfigs) + } def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { name match { @@ -123,11 +127,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig = kafkaConfig + private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) private[server] def initialize(zkClient: KafkaZkClient): Unit = { val adminZkClient = new AdminZkClient(zkClient) updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)) - updateBrokerConfig(kafkaConfig.brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)) + val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString) + val brokerConfig = maybeReEncodePasswords(props, adminZkClient) + updateBrokerConfig(kafkaConfig.brokerId, brokerConfig) } def addReconfigurables(kafkaServer: KafkaServer): Unit = { @@ -136,6 +143,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(kafkaServer.logManager.cleaner) addReconfigurable(new DynamicLogConfig(kafkaServer.logManager)) addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) + addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) } def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { @@ -187,22 +195,37 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { + secret.map { secret => + new PasswordEncoder(secret, + kafkaConfig.passwordEncoderKeyFactoryAlgorithm, + kafkaConfig.passwordEncoderCipherAlgorithm, + kafkaConfig.passwordEncoderKeyLength, + kafkaConfig.passwordEncoderIterations) + } + } + + private def passwordEncoder: PasswordEncoder = { + dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured")) + } + private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = { val props = configProps.clone().asInstanceOf[Properties] - // TODO (KAFKA-6246): encrypt passwords + def encodePassword(configName: String): Unit = { val value = props.getProperty(configName) if (value != null) { if (!perBrokerConfig) throw new ConfigException("Password config can be defined only at broker level") - props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8))) + props.setProperty(configName, passwordEncoder.encode(new Password(value))) } } DynamicPasswordConfigs.foreach(encodePassword) props } - private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = { + private[server] def fromPersistentProps(persistentProps: Properties, + perBrokerConfig: Boolean): Properties = { val props = persistentProps.clone().asInstanceOf[Properties] // Remove all invalid configs from `props` @@ -219,17 +242,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (!perBrokerConfig) removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") - // TODO (KAFKA-6246): encrypt passwords def decodePassword(configName: String): Unit = { val value = props.getProperty(configName) if (value != null) { - props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8)) + try { + props.setProperty(configName, passwordEncoder.decode(value).value) + } catch { + case e: Exception => + error(s"Dynamic password config $configName could not be decoded, ignoring.", e) + props.remove(configName) + } } } + DynamicPasswordConfigs.foreach(decodePassword) props } + // If the secret has changed, password.encoder.old.secret contains the old secret that was used + // to encode the configs in ZK. Decode passwords using the old secret and update ZK with values + // encoded using the current secret. Ignore any errors during decoding since old secret may not + // have been removed during broker restart. + private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { + val props = persistentProps.clone().asInstanceOf[Properties] + if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) { + maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => + DynamicPasswordConfigs.foreach { configName => + val value = props.getProperty(configName) + if (value != null) { + val decoded = try { + Some(passwordDecoder.decode(value).value) + } catch { + case _: Exception => + debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.") + None + } + decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) } + } + } + adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props) + } + } + props + } + private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) { def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = { if (invalidPropNames.nonEmpty) @@ -331,14 +387,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging newProps ++= staticBrokerConfigs overrideProps(newProps, dynamicDefaultConfigs) overrideProps(newProps, dynamicBrokerConfigs) - val newConfig = processReconfiguration(newProps, validateOnly = false) + val oldConfig = currentConfig + val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig - kafkaConfig.updateCurrentConfig(currentConfig) + kafkaConfig.updateCurrentConfig(newConfig) + + // Process BrokerReconfigurable updates after current config is updated + brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } } - private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = { + private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = { val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None) val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals) if (updatedMap.nonEmpty) { @@ -352,16 +412,22 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val updatedKeys = updatedConfigs(newValues, oldValues).keySet if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)) - processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly) + processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) case reconfigurable => if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet)) - processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) } + + // BrokerReconfigurable updates are processed after config is updated. Only do the validation here. + val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]() brokerReconfigurables.foreach { reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) - processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly) + if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) { + reconfigurable.validateReconfiguration(newConfig) + if (!validateOnly) + brokerReconfigurablesToUpdate += reconfigurable + } } - newConfig + (newConfig, brokerReconfigurablesToUpdate.toList) } catch { case e: Exception => if (!validateOnly) @@ -370,7 +436,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } else - currentConfig + (currentConfig, List.empty) } private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = { @@ -378,27 +444,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def processReconfigurable(reconfigurable: Reconfigurable, + updatedConfigNames: Set[String], allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { val newConfigs = new util.HashMap[String, Object] allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) - if (validateOnly) { - if (!reconfigurable.validateReconfiguration(newConfigs)) - throw new ConfigException("Validation of dynamic config update failed") - } else - reconfigurable.reconfigure(newConfigs) - } + try { + reconfigurable.validateReconfiguration(newConfigs) + } catch { + case e: ConfigException => throw e + case _: Exception => + throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}") + } - private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable, - oldConfig: KafkaConfig, - newConfig: KafkaConfig, - validateOnly: Boolean): Unit = { - if (validateOnly) - reconfigurable.validateReconfiguration(newConfig) - else - reconfigurable.reconfigure(oldConfig, newConfig) + if (!validateOnly) { + info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames custom configs: $newCustomConfigs") + reconfigurable.reconfigure(newConfigs) + } } } @@ -427,11 +491,10 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi DynamicLogConfig.ReconfigurableConfigs.asJava } - override def validateReconfiguration(configs: util.Map[String, _]): Boolean = { + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { // For update of topic config overrides, only config names and types are validated // Names and types have already been validated. For consistency with topic config // validation, no additional validation is performed. - true } override def reconfigure(configs: util.Map[String, _]): Unit = { @@ -538,7 +601,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf configs } - override def validateReconfiguration(configs: util.Map[String, _]): Boolean = { + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { val updatedMetricsReporters = metricsReporterClasses(configs) // Ensure all the reporter classes can be loaded and have a default constructor @@ -548,10 +611,11 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf } // Validate the new configuration using every reconfigurable reporter instance that is not being deleted - currentReporters.values.forall { + currentReporters.values.foreach { case reporter: Reconfigurable => - !updatedMetricsReporters.contains(reporter.getClass.getName) || reporter.validateReconfiguration(configs) - case _ => true + if (updatedMetricsReporters.contains(reporter.getClass.getName)) + reporter.validateReconfiguration(configs) + case _ => } } @@ -588,3 +652,103 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala } } +object DynamicListenerConfig { + + val ReconfigurableConfigs = Set( + // Listener configs + KafkaConfig.AdvertisedListenersProp, + KafkaConfig.ListenersProp, + KafkaConfig.ListenerSecurityProtocolMapProp, + + // SSL configs + KafkaConfig.PrincipalBuilderClassProp, + KafkaConfig.SslProtocolProp, + KafkaConfig.SslProviderProp, + KafkaConfig.SslCipherSuitesProp, + KafkaConfig.SslEnabledProtocolsProp, + KafkaConfig.SslKeystoreTypeProp, + KafkaConfig.SslKeystoreLocationProp, + KafkaConfig.SslKeystorePasswordProp, + KafkaConfig.SslKeyPasswordProp, + KafkaConfig.SslTruststoreTypeProp, + KafkaConfig.SslTruststoreLocationProp, + KafkaConfig.SslTruststorePasswordProp, + KafkaConfig.SslKeyManagerAlgorithmProp, + KafkaConfig.SslTrustManagerAlgorithmProp, + KafkaConfig.SslEndpointIdentificationAlgorithmProp, + KafkaConfig.SslSecureRandomImplementationProp, + KafkaConfig.SslClientAuthProp, + + // SASL configs + KafkaConfig.SaslMechanismInterBrokerProtocolProp, + KafkaConfig.SaslJaasConfigProp, + KafkaConfig.SaslEnabledMechanismsProp, + KafkaConfig.SaslKerberosServiceNameProp, + KafkaConfig.SaslKerberosKinitCmdProp, + KafkaConfig.SaslKerberosTicketRenewWindowFactorProp, + KafkaConfig.SaslKerberosTicketRenewJitterProp, + KafkaConfig.SaslKerberosMinTimeBeforeReloginProp, + KafkaConfig.SaslKerberosPrincipalToLocalRulesProp + ) +} + +class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging { + + override def reconfigurableConfigs: Set[String] = { + DynamicListenerConfig.ReconfigurableConfigs + } + + def validateReconfiguration(newConfig: KafkaConfig): Unit = { + + def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = { + newConfig.originals.asScala + .filterKeys(_.startsWith(prefix)) + .filterKeys(k => !DynamicSecurityConfigs.contains(k)) + } + + val oldConfig = server.config + val newListeners = listenersToMap(newConfig.listeners) + val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners) + val oldListeners = listenersToMap(oldConfig.listeners) + if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) + throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'") + if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet) + throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match") + newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName => + val prefix = listenerName.configPrefix + val newListenerProps = immutableListenerConfigs(newConfig, prefix) + val oldListenerProps = immutableListenerConfigs(oldConfig, prefix) + if (newListenerProps != oldListenerProps) + throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " + + "restart broker or create a new listener for update") + if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName)) + throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") + } + if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName)) + throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") + } + + def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + val newListeners = newConfig.listeners + val newListenerMap = listenersToMap(newListeners) + val oldListeners = oldConfig.listeners + val oldListenerMap = listenersToMap(oldListeners) + val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName)) + val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName)) + + // Clear SASL login cache to force re-login + if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty) + LoginManager.closeAll() + + server.socketServer.removeListeners(listenersRemoved) + if (listenersAdded.nonEmpty) + server.socketServer.addListeners(listenersAdded) + + server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo) + } + + private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] = + listeners.map(e => (e.listenerName, e)).toMap + +} + diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2dd6951..1f448af 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1291,7 +1291,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleSaslHandshakeRequest(request: RequestChannel.Request) { - sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)) + sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet())) } def handleSaslAuthenticateRequest(request: RequestChannel.Request) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 144dd65..64698f7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -223,6 +223,11 @@ object Defaults { val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L + + /** ********* Password encryption configuration for dynamic configs *********/ + val PasswordEncoderCipherAlgorithm = "AES/CBC/PKCS5Padding" + val PasswordEncoderKeyLength = 128 + val PasswordEncoderIterations = 4096 } object KafkaConfig { @@ -412,6 +417,7 @@ object KafkaConfig { /** ********* SASL Configuration ****************/ val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol" + val SaslJaasConfigProp = SaslConfigs.SASL_JAAS_CONFIG val SaslEnabledMechanismsProp = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD @@ -426,6 +432,14 @@ object KafkaConfig { val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms" val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms" + /** ********* Password encryption configuration for dynamic configs *********/ + val PasswordEncoderSecretProp = "password.encoder.secret" + val PasswordEncoderOldSecretProp = "password.encoder.old.secret" + val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm" + val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm" + val PasswordEncoderKeyLengthProp = "password.encoder.key.length" + val PasswordEncoderIterationsProp = "password.encoder.iterations" + /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" @@ -689,6 +703,7 @@ object KafkaConfig { /** ********* Sasl Configuration ****************/ val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI." + val SaslJaasConfigDoc = SaslConfigs.SASL_JAAS_CONFIG_DOC val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC @@ -704,6 +719,16 @@ object KafkaConfig { val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day." val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens." + /** ********* Password encryption configuration for dynamic configs *********/ + val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker." + val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " + + "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " + + s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up." + val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " + + "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise." + val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords." + val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords." + val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords." private val configDef = { import ConfigDef.Importance._ @@ -898,6 +923,7 @@ object KafkaConfig { /** ********* Sasl Configuration ****************/ .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc) + .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc) .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc) .define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc) .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc) @@ -911,6 +937,13 @@ object KafkaConfig { .define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc) .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc) + /** ********* Password encryption configuration for dynamic configs *********/ + .define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc) + .define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc) + .define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc) + .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc) + .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) + .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc) } def configNames() = configDef.names().asScala.toList.sorted @@ -942,9 +975,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def this(props: java.util.Map[_, _]) = this(props, true, None) def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None) - private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this)) // Cache the current config to avoid acquiring read lock to access from dynamicConfig @volatile private var currentConfig = this + private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this)) private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = { this.currentConfig = newConfig @@ -1076,8 +1109,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) - val (interBrokerListenerName, interBrokerSecurityProtocol) = getInterBrokerListenerNameAndSecurityProtocol - // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp) @@ -1120,32 +1151,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) - /** ********* SSL Configuration **************/ - val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp) - val sslProtocol = getString(KafkaConfig.SslProtocolProp) - val sslProvider = getString(KafkaConfig.SslProviderProp) - val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp) - def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp) - def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp) - def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp) - def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp) - val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp) - val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp) - val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp) - val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp) - val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp) - val sslClientAuth = getString(KafkaConfig.SslClientAuthProp) - val sslCipher = getList(KafkaConfig.SslCipherSuitesProp) - - /** ********* Sasl Configuration **************/ - val saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) - val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp) - val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp) - val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp) - val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp) - val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp) - val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp) - val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp) + /** ********* SSL/SASL Configuration **************/ + // Security configs may be overridden for listeners, so it is not safe to use the base values + // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be + // retrieved using KafkaConfig#valuesWithPrefixOverride + private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = { + val value = valuesWithPrefixOverride(listenerName.configPrefix).get(KafkaConfig.SaslEnabledMechanismsProp) + if (value != null) + value.asInstanceOf[util.List[String]].asScala.toSet + else + Set.empty[String] + } + + def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 + def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 + def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1 /** ********* DelegationToken Configuration **************/ @@ -1155,6 +1175,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp) val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp) + /** ********* Password encryption configuration for dynamic configs *********/ + def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp)) + def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp)) + def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp) + def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)) + def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp) + def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp) + /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) @@ -1170,9 +1198,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) def compressionType = getString(KafkaConfig.CompressionTypeProp) - val listeners: Seq[EndPoint] = getListeners - val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners - private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) @@ -1203,7 +1228,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // If the user did not define listeners but did define host or port, let's use them in backward compatible way // If none of those are defined, we default to PLAINTEXT://:9092 - private def getListeners: Seq[EndPoint] = { + def listeners: Seq[EndPoint] = { Option(getString(KafkaConfig.ListenersProp)).map { listenerProp => CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap) }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) @@ -1212,14 +1237,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // If the user defined advertised listeners, we use those // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners - private def getAdvertisedListeners: Seq[EndPoint] = { + def advertisedListeners: Seq[EndPoint] = { val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp) if (advertisedListenersProp != null) CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap) else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap) else - getListeners + listeners } private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { @@ -1248,7 +1273,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO } } - private def getListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { + def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp)) .map { case (listenerName, protocolName) => ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp) @@ -1295,7 +1320,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM, s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") - require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol), + require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol), s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication") require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes, s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 747a0df..0212181 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramMechanism import org.apache.kafka.common.security.token.delegation.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} @@ -236,8 +237,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP logManager.startup() metadataCache = new MetadataCache(config.brokerId) - tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms) - credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache) + // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. + // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. + tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) + credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup() @@ -366,7 +369,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64)) } - private def createBrokerInfo: BrokerInfo = { + private[server] def createBrokerInfo: BrokerInfo = { val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala new file mode 100644 index 0000000..ff11e24 --- /dev/null +++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.utils + +import java.nio.charset.StandardCharsets +import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom} +import java.security.spec.AlgorithmParameterSpec +import javax.crypto.{Cipher, SecretKeyFactory} +import javax.crypto.spec._ + +import kafka.utils.PasswordEncoder._ +import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.utils.Base64 + +import scala.collection.Map + +object PasswordEncoder { + val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm" + val CipherAlgorithmProp = "cipherAlgorithm" + val InitializationVectorProp = "initializationVector" + val KeyLengthProp = "keyLength" + val SaltProp = "salt" + val IterationsProp = "iterations" + val EncyrptedPasswordProp = "encryptedPassword" + val PasswordLengthProp = "passwordLength" +} + +/** + * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map + * containing the encoded password in base64 and along with the properties used for encryption. + * + * @param secret The secret used for encoding and decoding + * @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is + * used if available, PBKDF2WithHmacSHA1 otherwise. + * @param cipherAlgorithm Cipher algorithm used for encoding. + * @param keyLength Key length used for encoding. This should be valid for the specified algorithms. + * @param iterations Iteration count used for encoding. + * + * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords. + * The values used for encoding are stored along with the encoded password and the stored values are used for decoding. + * + */ +class PasswordEncoder(secret: Password, + keyFactoryAlgorithm: Option[String], + cipherAlgorithm: String, + keyLength: Int, + iterations: Int) extends Logging { + + private val secureRandom = new SecureRandom + private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm) + + def encode(password: Password): String = { + val salt = new Array[Byte](256) + secureRandom.nextBytes(salt) + val cipher = Cipher.getInstance(cipherAlgorithm) + val keyFactory = secretKeyFactory(keyFactoryAlgorithm) + val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations) + cipher.init(Cipher.ENCRYPT_MODE, keySpec) + val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8)) + val encryptedMap = Map( + KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm, + CipherAlgorithmProp -> cipherAlgorithm, + KeyLengthProp -> keyLength, + SaltProp -> base64Encode(salt), + IterationsProp -> iterations.toString, + EncyrptedPasswordProp -> base64Encode(encryptedPassword), + PasswordLengthProp -> password.value.length + ) ++ cipherParamsEncoder.toMap(cipher.getParameters) + encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",") + } + + def decode(encodedPassword: String): Password = { + val params = CoreUtils.parseCsvMap(encodedPassword) + val keyFactoryAlg = params(KeyFactoryAlgorithmProp) + val cipherAlg = params(CipherAlgorithmProp) + val keyLength = params(KeyLengthProp).toInt + val salt = base64Decode(params(SaltProp)) + val iterations = params(IterationsProp).toInt + val encryptedPassword = base64Decode(params(EncyrptedPasswordProp)) + val passwordLengthProp = params(PasswordLengthProp).toInt + val cipher = Cipher.getInstance(cipherAlg) + val keyFactory = secretKeyFactory(Some(keyFactoryAlg)) + val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations) + cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params)) + val password = try { + val decrypted = cipher.doFinal(encryptedPassword) + new String(decrypted, StandardCharsets.UTF_8) + } catch { + case e: Exception => throw new ConfigException("Password could not be decoded", e) + } + if (password.length != passwordLengthProp) // Sanity check + throw new ConfigException("Password could not be decoded, sanity check of length failed") + new Password(password) + } + + private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = { + keyFactoryAlg match { + case Some(algorithm) => SecretKeyFactory.getInstance(algorithm) + case None => + try { + SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512") + } catch { + case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1") + } + } + } + + private def secretKeySpec(keyFactory: SecretKeyFactory, + cipherAlg: String, + keyLength: Int, + salt: Array[Byte], iterations: Int): SecretKeySpec = { + val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength) + val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg + new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm) + } + + private def base64Encode(bytes: Array[Byte]): String = Base64.encoder.encodeToString(bytes) + + private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.decoder.decode(encoded) + + private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = { + val aesPattern = "AES/(.*)/.*".r + cipherAlgorithm match { + case aesPattern("GCM") => new GcmParamsEncoder + case _ => new IvParamsEncoder + } + } + + private trait CipherParamsEncoder { + def toMap(cipher: AlgorithmParameters): Map[String, String] + def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec + } + + private class IvParamsEncoder extends CipherParamsEncoder { + def toMap(cipherParams: AlgorithmParameters): Map[String, String] = { + if (cipherParams != null) { + val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec]) + Map(InitializationVectorProp -> base64Encode(ivSpec.getIV)) + } else + throw new IllegalStateException("Could not determine initialization vector for cipher") + } + def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = { + new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp))) + } + } + + private class GcmParamsEncoder extends CipherParamsEncoder { + def toMap(cipherParams: AlgorithmParameters): Map[String, String] = { + if (cipherParams != null) { + val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec]) + Map(InitializationVectorProp -> base64Encode(spec.getIV), + "authenticationTagLength" -> spec.getTLen.toString) + } else + throw new IllegalStateException("Could not determine initialization vector for cipher") + } + def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = { + new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp))) + } + } +} diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index d683a8d..b545455 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -82,6 +82,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}") } + def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { + val brokerIdPath = brokerInfo.path + val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) + retryRequestUntilConnected(setDataRequest) + info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) + } + /** * Gets topic partition states for the given partitions. * @param partitions the partitions for which we want ot get states. diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 6f45bca..867e03d 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -430,13 +430,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) assertFalse(listenerSecurityProtocolMap.isDefault) assertFalse(listenerSecurityProtocolMap.isSensitive) - assertTrue(listenerSecurityProtocolMap.isReadOnly) + assertFalse(listenerSecurityProtocolMap.isReadOnly) val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp) assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name) assertNull(truststorePassword.value) assertFalse(truststorePassword.isDefault) assertTrue(truststorePassword.isSensitive) - assertTrue(truststorePassword.isReadOnly) + assertFalse(truststorePassword.isReadOnly) val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) assertEquals(servers(1).config.compressionType.toString, compressionType.value) assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index f459252..273b247 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -59,12 +59,7 @@ trait SaslSetup { case _ => false }) if (hasKerberos) { - val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles() - kdc = new MiniKdc(kdcConf, workDir) - kdc.start() - kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") - kdc.createPrincipal(clientKeytabFile, - JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) + initializeKerberos() } writeJaasConfigurationToFile(jaasSections) val hasZk = jaasSections.exists(_.modules.exists { @@ -75,6 +70,15 @@ trait SaslSetup { System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } + protected def initializeKerberos(): Unit = { + val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles() + kdc = new MiniKdc(kdcConf, workDir) + kdc.start() + kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") + kdc.createPrincipal(clientKeytabFile, + JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) + } + /** Return a tuple with the path to the server keytab file and client keytab file */ protected def maybeCreateEmptyKeytabFiles(): (File, File) = { if (serverKeytabFile.isEmpty) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 1224274..b7f0ae8 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -18,28 +18,29 @@ package kafka.server -import java.io.{Closeable, File, FileOutputStream, FileWriter} +import java.io.{Closeable, File, FileWriter} import java.nio.file.{Files, StandardCopyOption} import java.lang.management.ManagementFactory import java.util import java.util.{Collections, Properties} -import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit} +import java.util.concurrent._ import javax.management.ObjectName import kafka.admin.ConfigCommand -import kafka.api.SaslSetup -import kafka.log.LogConfig +import kafka.api.{KafkaSasl, SaslSetup} import kafka.coordinator.group.OffsetConfig +import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec -import kafka.utils.{ShutdownableThread, TestUtils} +import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness} +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition} -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.{ConfigException, ConfigResource, SslConfigs} import org.apache.kafka.common.config.SslConfigs._ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException} @@ -55,6 +56,7 @@ import org.junit.{After, Before, Test} import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ +import scala.collection.Seq object DynamicBrokerReconfigurationTest { val SecureInternal = "INTERNAL" @@ -65,12 +67,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet import DynamicBrokerReconfigurationTest._ - private var servers = new ArrayBuffer[KafkaServer] + private val servers = new ArrayBuffer[KafkaServer] private val numServers = 3 private val producers = new ArrayBuffer[KafkaProducer[String, String]] private val consumers = new ArrayBuffer[KafkaConsumer[String, String]] private val adminClients = new ArrayBuffer[AdminClient]() private val clientThreads = new ArrayBuffer[ShutdownableThread]() + private val executors = new ArrayBuffer[ExecutorService] private val topic = "testtopic" private val kafkaClientSaslMechanism = "PLAIN" @@ -95,10 +98,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0") props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL") props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal) + props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN") props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) props.put(KafkaConfig.LogSegmentBytesProp, "2000") props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") + props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") + props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret") props ++= sslProperties1 addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal) @@ -127,8 +133,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet clientThreads.foreach(_.interrupt()) clientThreads.foreach(_.initiateShutdown()) clientThreads.foreach(_.join(5 * 1000)) - producers.foreach(_.close()) - consumers.foreach(_.close()) + executors.foreach(_.shutdownNow()) + producers.foreach(_.close(0, TimeUnit.MILLISECONDS)) + consumers.foreach(_.close(0, TimeUnit.MILLISECONDS)) adminClients.foreach(_.close()) TestUtils.shutdownServers(servers) super.tearDown() @@ -514,13 +521,217 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet stopAndVerifyProduceConsume(producerThread, consumerThread) } + @Test + def testAdvertisedListenerUpdate(): Unit = { + val adminClient = adminClients.head + val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal) + + // Ensure connections are made to brokers before external listener is made inaccessible + describeConfig(externalAdminClient) + + // Update broker keystore for external listener to use invalid listener address + // any address other than localhost is sufficient to fail (either connection or host name verification failure) + val invalidHost = "192.168.0.1" + alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost) + + // Verify that producer connections fail since advertised listener is invalid + val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed + val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap) + + val sendFuture = verifyConnectionFailure(producer1) + + alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost") + + // Verify that produce/consume work now + val producer = createProducer(trustStoreFile1, retries = 0) + val consumer = createConsumer("group2", trustStoreFile1, topic) + verifyProduceConsume(producer, consumer, 10, topic) + + // Verify updating inter-broker listener + val props = new Properties + props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal) + try { + reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal)) + fail("Inter-broker listener cannot be dynamically updated") + } catch { + case e: ExecutionException => + assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException]) + servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value)) + } + + // Verify that the other send did not complete + verifyTimeout(sendFuture) + } + + @Test + def testAddRemoveSslListener(): Unit = { + verifyAddListener("SSL", SecurityProtocol.SSL, Seq.empty) + + // Restart servers and check secret rotation + servers.foreach(_.shutdown()) + servers.foreach(_.awaitShutdown()) + adminClients.foreach(_.close()) + adminClients.clear() + + // All passwords are currently encoded with password.encoder.secret. Encode with password.encoder.old.secret + // and update ZK. When each server is started, it should decode using password.encoder.old.secret and update + // ZK with newly encoded values using password.encoder.secret. + servers.foreach { server => + val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) + val config = server.config + val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) + val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured")) + val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains) + val passwordDecoder = new PasswordEncoder(secret, + config.passwordEncoderKeyFactoryAlgorithm, + config.passwordEncoderCipherAlgorithm, + config.passwordEncoderKeyLength, + config.passwordEncoderIterations) + val passwordEncoder = new PasswordEncoder(oldSecret, + config.passwordEncoderKeyFactoryAlgorithm, + config.passwordEncoderCipherAlgorithm, + config.passwordEncoderKeyLength, + config.passwordEncoderIterations) + passwordConfigs.foreach { case (name, value) => + val decoded = passwordDecoder.decode(value).value + props.put(name, passwordEncoder.encode(new Password(decoded))) + } + val brokerId = server.config.brokerId + adminZkClient.changeBrokerConfig(Seq(brokerId), props) + val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) + passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) } + + server.startup() + TestUtils.retry(10000) { + val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) + passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) } + } + } + + verifyListener(SecurityProtocol.SSL, None) + createAdminClient(SecurityProtocol.SSL, SecureInternal) + verifyRemoveListener("SSL", SecurityProtocol.SSL, Seq.empty) + } + + @Test + def testAddRemoveSaslListeners(): Unit = { + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + initializeKerberos() + + //verifyAddListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN")) + verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) + //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN")) + verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) + } + + private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { + val config = servers.head.config + val existingListenerCount = config.listeners.size + val listeners = config.listeners + .map(e => s"${e.listenerName.value}://${e.host}:${e.port}") + .mkString(",") + s",$listenerName://localhost:0" + val listenerMap = config.listenerSecurityProtocolMap + .map { case (name, protocol) => s"${name.value}:${protocol.name}" } + .mkString(",") + s",$listenerName:${securityProtocol.name}" + + val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap) + securityProtocol match { + case SecurityProtocol.SSL => + addListenerPropsSsl(listenerName, props) + case SecurityProtocol.SASL_PLAINTEXT => + addListenerPropsSasl(listenerName, saslMechanisms, props) + case SecurityProtocol.SASL_SSL => + addListenerPropsSasl(listenerName, saslMechanisms, props) + addListenerPropsSsl(listenerName, props) + case SecurityProtocol.PLAINTEXT => // no additional props + } + + alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get + + TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1), + "Listener config not updated") + TestUtils.waitUntilTrue(() => servers.forall(server => { + try { + server.socketServer.boundPort(new ListenerName(listenerName)) > 0 + } catch { + case _: Exception => false + } + }), "Listener not created") + + if (saslMechanisms.nonEmpty) + saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism))) + else + verifyListener(securityProtocol, None) + } + + private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { + val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head + val producer1 = createProducer(listenerName, securityProtocol, saslMechanism) + val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism, + s"remove-listener-group-$securityProtocol") + verifyProduceConsume(producer1, consumer1, numRecords = 10, topic) + // send another message to check consumer later + producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS) + + val config = servers.head.config + val existingListenerCount = config.listeners.size + val listeners = config.listeners + .filter(e => e.listenerName.value != securityProtocol.name) + .map(e => s"${e.listenerName.value}://${e.host}:${e.port}") + .mkString(",") + val listenerMap = config.listenerSecurityProtocolMap + .filterKeys(listenerName => listenerName.value != securityProtocol.name) + .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" } + .mkString(",") + + val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) + val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix)) + listenerProps.foreach(props.remove) + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap) + alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get + + TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1), + "Listeners not updated") + + // Test that connections using deleted listener don't work + val producerFuture = verifyConnectionFailure(producer1) + val consumerFuture = verifyConnectionFailure(consumer1) + + // Test that other listeners still work + val producer2 = createProducer(trustStoreFile1, retries = 0) + val consumer2 = createConsumer(s"remove-listener-group2-$securityProtocol", trustStoreFile1, topic, autoOffsetReset = "latest") + verifyProduceConsume(producer2, consumer2, numRecords = 10, topic) + + // Verify that producer/consumer using old listener don't work + verifyTimeout(producerFuture) + verifyTimeout(consumerFuture) + } + + private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]): Unit = { + val mechanism = saslMechanism.getOrElse("") + val producer = createProducer(securityProtocol.name, securityProtocol, mechanism) + val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism, + s"add-listener-group-$securityProtocol-$mechanism") + verifyProduceConsume(producer, consumer, numRecords = 10, topic) + } + + private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) + private def createProducer(trustStore: File, retries: Int, - clientId: String = "test-producer"): KafkaProducer[String, String] = { - val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) + clientId: String = "test-producer", + bootstrap: String = bootstrapServers, + securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL): KafkaProducer[String, String] = { val propsOverride = new Properties propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId) + propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS") val producer = TestUtils.createNewProducer( - bootstrapServers, + bootstrap, acks = -1, retries = retries, securityProtocol = SecurityProtocol.SASL_SSL, @@ -533,17 +744,72 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet producer } - private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = { - val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) + private def createConsumer(groupId: String, trustStore: File, + topic: String = topic, + bootstrap: String = bootstrapServers, + securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL, + autoOffsetReset: String = "earliest"):KafkaConsumer[String, String] = { + val propsOverride = new Properties + propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS") val consumer = TestUtils.createNewConsumer( - bootstrapServers, + bootstrap, groupId, - securityProtocol = SecurityProtocol.SASL_SSL, + autoOffsetReset = autoOffsetReset, + securityProtocol = securityProtocol, trustStoreFile = Some(trustStore), saslProperties = Some(clientSaslProps), keyDeserializer = new StringDeserializer, valueDeserializer = new StringDeserializer) consumer.subscribe(Collections.singleton(topic)) + if (autoOffsetReset == "latest") { + do { + consumer.poll(1) + } while (consumer.assignment.isEmpty) + } + consumers += consumer + consumer + } + + private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: String): Properties = { + val props = new Properties + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) + props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS") + val saslProps = if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { + Some(kafkaClientSaslProperties(saslMechanism, dynamicJaasConfig = true)) + } else + None + val securityProps = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, + Some(trustStoreFile1), "client", TestUtils.SslCertificateCn, saslProps) + props ++= securityProps + props + } + + private def createProducer(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanism: String): KafkaProducer[String, String] = { + val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName)) + val producer = TestUtils.createNewProducer(bootstrapServers, + acks = -1, retries = 0, + securityProtocol = securityProtocol, + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer, + props = Some(clientProps(securityProtocol, saslMechanism))) + producers += producer + producer + } + + private def createConsumer(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanism: String, group: String): KafkaConsumer[String, String] = { + val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName)) + val consumer = TestUtils.createNewConsumer(bootstrapServers, group, + autoOffsetReset = "latest", + securityProtocol = securityProtocol, + keyDeserializer = new StringDeserializer, + valueDeserializer = new StringDeserializer, + props = Some(clientProps(securityProtocol, saslMechanism))) + consumer.subscribe(Collections.singleton(topic)) + do { + consumer.poll(1) + } while (consumer.assignment.isEmpty) consumers += consumer consumer } @@ -552,6 +818,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val config = new util.HashMap[String, Object] val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName)) config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10") val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps)) securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } @@ -563,7 +830,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def verifyProduceConsume(producer: KafkaProducer[String, String], consumer: KafkaConsumer[String, String], numRecords: Int, - topic: String = topic): Unit = { + topic: String): Unit = { val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i")) producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) @@ -639,6 +906,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)) } + private def serverEndpoints(adminClient: AdminClient): String = { + val nodes = adminClient.describeCluster().nodes().get + nodes.asScala.map { node => + s"${node.host}:${node.port}" + }.mkString(",") + } + + private def alterAdvertisedListener(adminClient: AdminClient, externalAdminClient: AdminClient, oldHost: String, newHost: String): Unit = { + val configs = servers.map { server => + val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) + val newListeners = server.config.advertisedListeners.map { e => + if (e.listenerName.value == SecureExternal) + s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}" + else + s"${e.listenerName.value}://${e.host}:${server.boundPort(e.listenerName)}" + }.mkString(",") + val configEntry = new ConfigEntry(KafkaConfig.AdvertisedListenersProp, newListeners) + (resource, new Config(Collections.singleton(configEntry))) + }.toMap.asJava + adminClient.alterConfigs(configs).all.get + servers.foreach { server => + TestUtils.retry(10000) { + val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal) + .getOrElse(throw new IllegalStateException("External listener not found")) + assertTrue("Config not updated", externalListener.host == newHost) + } + } + val (endpoints, altered) = TestUtils.computeUntilTrue(serverEndpoints(externalAdminClient)) { endpoints => + !endpoints.contains(oldHost) + } + assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered) + } + private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = { val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) @@ -701,12 +1001,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet adminZkClient.changeBrokerConfig(brokers, keystoreProps) } - private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = { - waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG, - sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs) - - } - private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) } } @@ -774,6 +1068,64 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = { + val executor = Executors.newSingleThreadExecutor + executors += executor + val future = executor.submit(new Runnable() { + def run() { + producer.send(new ProducerRecord(topic, "key", "value")).get + } + }) + verifyTimeout(future) + future + } + + private def verifyConnectionFailure(consumer: KafkaConsumer[String, String]): Future[_] = { + val executor = Executors.newSingleThreadExecutor + executors += executor + val future = executor.submit(new Runnable() { + def run() { + assertEquals(0, consumer.poll(100).count) + } + }) + verifyTimeout(future) + future + } + + private def verifyTimeout(future: Future[_]): Unit = { + try { + future.get(100, TimeUnit.MILLISECONDS) + fail("Operation should not have completed") + } catch { + case _: TimeoutException => // expected exception + } + } + + private def addListenerPropsSsl(listenerName: String, props: Properties): Unit = { + val prefix = new ListenerName(listenerName).configPrefix + sslProperties1.keySet.asScala.foreach { name => + val value = sslProperties1.get(name) + val valueStr = value match { + case password: Password => password.value + case list: util.List[_] => list.asScala.map(_.toString).mkString(",") + case _ => value.toString + } + props.put(s"$prefix$name", valueStr) + } + } + + private def addListenerPropsSasl(listener: String, mechanisms: Seq[String], props: Properties): Unit = { + val listenerName = new ListenerName(listener) + val prefix = listenerName.configPrefix + props.put(prefix + KafkaConfig.SaslEnabledMechanismsProp, mechanisms.mkString(",")) + props.put(prefix + KafkaConfig.SaslKerberosServiceNameProp, "kafka") + mechanisms.foreach { mechanism => + val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head + val jaasConfig = jaasSection.modules.head.toString + props.put(listenerName.saslMechanismConfigPrefix(mechanism) + KafkaConfig.SaslJaasConfigProp, jaasConfig) + } + } + private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) { private val producer = createProducer(trustStoreFile1, retries, clientId) @volatile var sent = 0 @@ -877,8 +1229,10 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close Set(PollingIntervalProp).asJava } - override def validateReconfiguration(configs: util.Map[String, _]): Boolean = { - configs.get(PollingIntervalProp).toString.toInt > 0 + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { + val pollingInterval = configs.get(PollingIntervalProp).toString + if (configs.get(PollingIntervalProp).toString.toInt <= 0) + throw new ConfigException(s"Invalid polling interval $pollingInterval") } override def reconfigure(configs: util.Map[String, _]): Unit = { diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala index 666ba4b..f2c6753 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala @@ -21,25 +21,23 @@ import java.util.Properties import kafka.utils.JaasTestUtils import kafka.utils.JaasTestUtils.JaasSection -import org.apache.kafka.common.network.ListenerName class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { import MultipleListenersWithSameSecurityProtocolBaseTest._ - override def saslProperties(listenerName: ListenerName): Properties = { - listenerName.value match { - case SecureInternal => kafkaClientSaslProperties(Plain, dynamicJaasConfig = true) - case SecureExternal => kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true) - case _ => throw new IllegalArgumentException(s"Unexpected listener name $listenerName") - } + override def staticJaasSections: Seq[JaasSection] = { + val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles() + JaasTestUtils.zkSections :+ + JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile)) } - override def jaasSections: Seq[JaasSection] = { - val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles() - JaasTestUtils.zkSections ++ Seq( - JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", Seq(GssApi), Some(serverKeytabFile)), - JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(Plain), None) - ) + override protected def dynamicJaasSections: Properties = { + val props = new Properties + kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism => + addDynamicJaasSection(props, SecureInternal, mechanism, + JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None)) + } + props } } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala index f3e1b8b..10df84e 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala @@ -26,12 +26,9 @@ import org.apache.kafka.common.network.ListenerName class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { - import MultipleListenersWithSameSecurityProtocolBaseTest._ + override def staticJaasSections: Seq[JaasSection] = + jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both) - override def saslProperties(listenerName: ListenerName): Properties = - kafkaClientSaslProperties(Plain, dynamicJaasConfig = true) - - override def jaasSections: Seq[JaasSection] = - jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both) + override protected def dynamicJaasSections: Properties = new Properties } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index da8437e..50ebaa4 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -19,13 +19,13 @@ package kafka.server import java.io.File -import java.util.{Collections, Properties} +import java.util.{Collections, Objects, Properties} import java.util.concurrent.TimeUnit import kafka.api.SaslSetup import kafka.coordinator.group.OffsetConfig import kafka.utils.JaasTestUtils.JaasSection -import kafka.utils.TestUtils +import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.Implicits._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} @@ -55,18 +55,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep private val trustStoreFile = File.createTempFile("truststore", ".jks") private val servers = new ArrayBuffer[KafkaServer] - private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]() - private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]() + private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]() + private val consumers = mutable.Map[ClientMetadata, KafkaConsumer[Array[Byte], Array[Byte]]]() protected val kafkaClientSaslMechanism = Plain - protected val kafkaServerSaslMechanisms = List(GssApi, Plain) + protected val kafkaServerSaslMechanisms = Map( + SecureExternal -> Seq("SCRAM-SHA-256", GssApi), + SecureInternal -> Seq(Plain, "SCRAM-SHA-512")) - protected def saslProperties(listenerName: ListenerName): Properties - protected def jaasSections: Seq[JaasSection] + protected def staticJaasSections: Seq[JaasSection] + protected def dynamicJaasSections: Properties @Before override def setUp(): Unit = { - startSasl(jaasSections) + startSasl(staticJaasSections) super.setUp() // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest val numServers = 2 @@ -82,8 +84,12 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep props.put(KafkaConfig.InterBrokerListenerNameProp, Internal) props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism) - props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) + props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}", + kafkaServerSaslMechanisms(SecureInternal).mkString(",")) + props.put(s"${new ListenerName(SecureExternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}", + kafkaServerSaslMechanisms(SecureExternal).mkString(",")) props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka") + props ++= dynamicJaasSections props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId") @@ -107,26 +113,37 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions, replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs) + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) + servers.head.config.listeners.foreach { endPoint => val listenerName = endPoint.listenerName - TestUtils.createTopic(zkClient, listenerName.value, 2, 2, servers) - val trustStoreFile = if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) Some(this.trustStoreFile) else None - val saslProps = - if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) Some(saslProperties(listenerName)) - else None - val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName) - producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1, - securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps) + def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = { + + val topic = s"${listenerName.value}${producers.size}" + TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val clientMetadata = ClientMetadata(listenerName, mechanism, topic) + + producers(clientMetadata) = TestUtils.createNewProducer(bootstrapServers, acks = -1, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps) + + consumers(clientMetadata) = TestUtils.createNewConsumer(bootstrapServers, groupId = clientMetadata.toString, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps) + } - consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value, - securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps) + if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) { + kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { mechanism => + addProducerConsumer(listenerName, mechanism, Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true))) + } + } else { + addProducerConsumer(listenerName, "", saslProps = None) + } } } @@ -145,18 +162,34 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep */ @Test def testProduceConsume(): Unit = { - producers.foreach { case (listenerName, producer) => - val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, + producers.foreach { case (clientMetadata, producer) => + val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes, s"value$i".getBytes)) producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) - val consumer = consumers(listenerName) - consumer.subscribe(Collections.singleton(listenerName.value)) + val consumer = consumers(clientMetadata) + consumer.subscribe(Collections.singleton(clientMetadata.topic)) val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] TestUtils.waitUntilTrue(() => { records ++= consumer.poll(50).asScala records.size == producerRecords.size - }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records") + }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records with mechanism ${clientMetadata.saslMechanism}") + } + } + + protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = { + val listenerName = new ListenerName(listener) + val prefix = listenerName.saslMechanismConfigPrefix(mechanism) + val jaasConfig = jaasSection.modules.head.toString + props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig) + } + + case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: String, topic: String) { + override def hashCode: Int = Objects.hash(listenerName, saslMechanism) + override def equals(obj: Any): Boolean = obj match { + case other: ClientMetadata => listenerName == other.listenerName && saslMechanism == other.saslMechanism && topic == other.topic + case _ => false } + override def toString: String = s"${listenerName.value}:$saslMechanism:$topic" } } diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index c934c4a..18be167 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -87,13 +87,13 @@ class KafkaTest { val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password", "--override", "ssl.key.password=key_password", "--override", "ssl.truststore.password=truststore_password"))) - assertEquals(Password.HIDDEN, config.sslKeyPassword.toString) - assertEquals(Password.HIDDEN, config.sslKeystorePassword.toString) - assertEquals(Password.HIDDEN, config.sslTruststorePassword.toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString) - assertEquals("key_password", config.sslKeyPassword.value) - assertEquals("keystore_password", config.sslKeystorePassword.value) - assertEquals("truststore_password", config.sslTruststorePassword.value) + assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value) + assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value) + assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value) } def prepareDefaultConfig(): String = { diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 6e78423..b3c46fa 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -27,10 +27,10 @@ import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.clients.admin._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.internals.KafkaFutureImpl -import org.apache.kafka.common.{KafkaFuture, Node} +import org.apache.kafka.common.Node import org.apache.kafka.common.security.scram.ScramCredentialUtils import org.apache.kafka.common.utils.Sanitizer -import org.easymock.{EasyMock, IAnswer} +import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 13299c7..724e05b 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.security.scram.ScramMechanism import org.apache.kafka.common.utils.{LogContext, MockTime, Time} import org.apache.log4j.Level import org.junit.Assert._ @@ -61,7 +62,7 @@ class SocketServerTest extends JUnitSuite { props.put("connections.max.idle.ms", "60000") val config = KafkaConfig.fromProps(props) val metrics = new Metrics - val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null) + val credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, null) val localAddress = InetAddress.getLoopbackAddress // Clean-up any metrics left around by previous tests @@ -406,7 +407,7 @@ class SocketServerTest extends JUnitSuite { // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) // make sure the sockets are open - server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) + server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server shutdownServerAndMetrics(server) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 6dedbe0..2e3e274 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -17,13 +17,18 @@ package kafka.server +import java.util import java.util.Properties import kafka.utils.TestUtils +import org.apache.kafka.common.Reconfigurable +import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.junit.Assert._ import org.junit.Test +import scala.collection.JavaConverters._ + class DynamicBrokerConfigTest { @Test @@ -34,7 +39,7 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(props) val dynamicConfig = config.dynamicConfig assertSame(config, dynamicConfig.currentKafkaConfig) - assertEquals(oldKeystore, config.sslKeystoreLocation) + assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertEquals(oldKeystore, config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -55,7 +60,7 @@ class DynamicBrokerConfigTest { assertEquals(newKeystore, config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) - assertEquals(oldKeystore, config.sslKeystoreLocation) + assertEquals(oldKeystore, config.getString(KafkaConfig.SslKeystoreLocationProp)) assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -75,38 +80,48 @@ class DynamicBrokerConfigTest { origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS") val config = KafkaConfig(origProps) - def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = { - val props = new Properties - validProps.foreach { case (k, v) => props.put(k, v) } - invalidProps.foreach { case (k, v) => props.put(k, v) } - - // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in - // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid. - try { - config.dynamicConfig.validate(props, perBrokerConfig = true) - fail("Invalid config did not fail validation") - } catch { - case e: ConfigException => // expected exception - } - - // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during - // startup and when configs are updated in ZK. Update should apply valid configs and ignore - // invalid ones. - config.dynamicConfig.updateBrokerConfig(0, props) - validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) } - invalidProps.keySet.foreach { name => - assertEquals(origProps.get(name), config.originals.get(name)) - } - } + val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12") - val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" ->"ks.p12") val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12") - verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix) + verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix) val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181") - verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps) + verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps) + // Test update of configs with invalid type val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid") - verifyConfigUpdateWithInvalidConfig(validProps, invalidProps) + verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps) + } + + @Test + def testConfigUpdateWithReconfigurableValidationFailure(): Unit = { + val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000") + val config = KafkaConfig(origProps) + val validProps = Map.empty[String, String] + val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20") + + def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = { + val cleanerThreads = configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt + if (cleanerThreads <=0 || cleanerThreads >= 5) + throw new ConfigException(s"Invalid cleaner threads $cleanerThreads") + } + val reconfigurable = new Reconfigurable { + override def configure(configs: util.Map[String, _]): Unit = {} + override def reconfigurableConfigs(): util.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp).asJava + override def validateReconfiguration(configs: util.Map[String, _]): Unit = validateLogCleanerConfig(configs) + override def reconfigure(configs: util.Map[String, _]): Unit = {} + } + config.dynamicConfig.addReconfigurable(reconfigurable) + verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps) + config.dynamicConfig.removeReconfigurable(reconfigurable) + + val brokerReconfigurable = new BrokerReconfigurable { + override def reconfigurableConfigs: collection.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp) + override def validateReconfiguration(newConfig: KafkaConfig): Unit = validateLogCleanerConfig(newConfig.originals) + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {} + } + config.dynamicConfig.addBrokerReconfigurable(brokerReconfigurable) + verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps) } @Test @@ -151,4 +166,86 @@ class DynamicBrokerConfigTest { assertEquals(oldValue, config.originals.get(name)) } } + + private def verifyConfigUpdateWithInvalidConfig(config: KafkaConfig, + origProps: Properties, + validProps: Map[String, String], + invalidProps: Map[String, String]): Unit = { + val props = new Properties + validProps.foreach { case (k, v) => props.put(k, v) } + invalidProps.foreach { case (k, v) => props.put(k, v) } + + // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in + // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid. + try { + config.dynamicConfig.validate(props, perBrokerConfig = true) + fail("Invalid config did not fail validation") + } catch { + case e: ConfigException => // expected exception + } + + // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during + // startup and when configs are updated in ZK. Update should apply valid configs and ignore + // invalid ones. + config.dynamicConfig.updateBrokerConfig(0, props) + validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) } + invalidProps.keySet.foreach { name => + assertEquals(origProps.get(name), config.originals.get(name)) + } + } + + @Test + def testPasswordConfigEncryption(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val configWithoutSecret = KafkaConfig(props) + props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") + val configWithSecret = KafkaConfig(props) + val dynamicProps = new Properties + dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "myLoginModule required;") + + try { + configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) + } catch { + case e: ConfigException => // expected exception + } + val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) + assertFalse("Password not encoded", + persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("myLoginModule")) + val decodedProps = configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, perBrokerConfig = true) + assertEquals("myLoginModule required;", decodedProps.getProperty(KafkaConfig.SaslJaasConfigProp)) + } + + @Test + def testPasswordConfigEncoderSecretChange(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;") + props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") + val config = KafkaConfig(props) + val dynamicProps = new Properties + dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;") + + val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) + assertFalse("Password not encoded", + persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("LoginModule")) + config.dynamicConfig.updateBrokerConfig(0, persistedProps) + assertEquals("dynamicLoginModule required;", config.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value) + + // New config with same secret should use the dynamic password config + val newConfigWithSameSecret = KafkaConfig(props) + newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) + assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value) + + // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig + props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret") + props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret") + val newConfigWithNewAndOldSecret = KafkaConfig(props) + newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) + assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value) + + // New config with new secret alone should revert to static password config since dynamic config cannot be decoded + props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret") + val newConfigWithNewSecret = KafkaConfig(props) + newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) + assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 748efec..6b26334 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -674,12 +674,22 @@ class KafkaConfigTest { case KafkaConfig.SaslKerberosTicketRenewJitterProp => case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp => case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string + case KafkaConfig.SaslJaasConfigProp => + + // Password encoder configs + case KafkaConfig.PasswordEncoderSecretProp => + case KafkaConfig.PasswordEncoderOldSecretProp => + case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp => + case KafkaConfig.PasswordEncoderCipherAlgorithmProp => + case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") //delegation token configs case KafkaConfig.DelegationTokenMasterKeyProp => // ignore case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") } }) diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala new file mode 100755 index 0000000..11a2a7a --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + + +import javax.crypto.SecretKeyFactory + +import kafka.server.Defaults +import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.utils.Java +import org.junit.Assert._ +import org.junit.Test + +class PasswordEncoderTest { + + @Test + def testEncodeDecode(): Unit = { + val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + None, + Defaults.PasswordEncoderCipherAlgorithm, + Defaults.PasswordEncoderKeyLength, + Defaults.PasswordEncoderIterations) + val password = "test-password" + val encoded = encoder.encode(new Password(password)) + val encodedMap = CoreUtils.parseCsvMap(encoded) + assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp)) + assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp)) + val defaultKeyFactoryAlgorithm = try { + SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512") + "PBKDF2WithHmacSHA512" + } catch { + case _: Exception => "PBKDF2WithHmacSHA1" + } + assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp)) + assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp)) + + verifyEncodedPassword(encoder, password, encoded) + } + + @Test + def testEncoderConfigChange(): Unit = { + val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + Some("PBKDF2WithHmacSHA1"), + "DES/CBC/PKCS5Padding", + 64, + 1024) + val password = "test-password" + val encoded = encoder.encode(new Password(password)) + val encodedMap = CoreUtils.parseCsvMap(encoded) + assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp)) + assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp)) + assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp)) + assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp)) + + // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered + val decoder = new PasswordEncoder(new Password("password-encoder-secret"), + Some("PBKDF2WithHmacSHA1"), + "AES/CBC/PKCS5Padding", + 128, + 2048) + assertEquals(password, decoder.decode(encoded).value) + + // Test that decoding fails if secret is altered + val decoder2 = new PasswordEncoder(new Password("secret-2"), + Some("PBKDF2WithHmacSHA1"), + "AES/CBC/PKCS5Padding", + 128, + 1024) + try { + decoder2.decode(encoded) + } catch { + case e: ConfigException => // expected exception + } + } + + @Test + def testEncodeDecodeAlgorithms(): Unit = { + + def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = { + val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + keyFactoryAlg, + cipherAlg, + keyLength, + Defaults.PasswordEncoderIterations) + val password = "test-password" + val encoded = encoder.encode(new Password(password)) + verifyEncodedPassword(encoder, password, encoded) + } + + verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64) + verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192) + verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128) + verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128) + verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128) + verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128) + if (Java.IS_JAVA8_COMPATIBLE) { + verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128) + verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128) + verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128) + } + } + + private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = { + val encodedMap = CoreUtils.parseCsvMap(encoded) + assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp)) + assertNotNull("Invalid salt", encoder.base64Decode(encodedMap("salt"))) + assertNotNull("Invalid encoding parameters", encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp))) + assertNotNull("Invalid encoded password", encoder.base64Decode(encodedMap(PasswordEncoder.EncyrptedPasswordProp))) + assertEquals(password, encoder.decode(encoded).value) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 407bdb5..303afa7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -718,7 +718,7 @@ object TestUtils extends Logging { def deleteBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] = { val brokers = ids.map(createBroker(_, "localhost", 6667, SecurityProtocol.PLAINTEXT)) - brokers.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b)) + ids.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b)) brokers } -- To stop receiving notification emails like this one, please contact jgus@apache.org.